diff --git a/policy-controller/src/index_list.rs b/policy-controller/src/index_list.rs new file mode 100644 index 000000000..c089af30c --- /dev/null +++ b/policy-controller/src/index_list.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +use kubert::index::IndexNamespacedResource; +use parking_lot::RwLock; + +/// A list of indexes for a specific resource type. +/// +/// An `IndexList` itself can then act as an index for that resource, and fans updates +/// out to each index in the list by cloning the update. +pub struct IndexList { + index: Arc>, + tail: Option, +} + +impl IndexNamespacedResource for IndexList +where + A: IndexNamespacedResource, + T: IndexNamespacedResource, + R: Clone, +{ + fn apply(&mut self, resource: R) { + if let Some(tail) = &mut self.tail { + tail.apply(resource.clone()); + } + self.index.write().apply(resource); + } + + fn delete(&mut self, namespace: String, name: String) { + if let Some(tail) = &mut self.tail { + tail.delete(namespace.clone(), name.clone()); + } + self.index.write().delete(namespace, name); + } +} + +impl IndexList { + pub fn push(self, index: Arc>) -> IndexList> { + IndexList { + index, + tail: Some(self), + } + } + + pub fn shared(self) -> Arc> { + Arc::new(RwLock::new(self)) + } +} + +impl IndexList { + /// Returns a new `IndexList`. + /// + /// The second type parameter in the return value here can be anything that + /// implements `IndexNamespacedResource`, since it will just be `None`. + /// Ideally, the type should be `!` (bottom) but `A` is conveniently available, + /// so we use that. + pub fn new(index: Arc>) -> IndexList { + IndexList { index, tail: None } + } +} diff --git a/policy-controller/src/index_pair.rs b/policy-controller/src/index_pair.rs deleted file mode 100644 index a0a08cef9..000000000 --- a/policy-controller/src/index_pair.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::sync::Arc; - -use kubert::index::IndexNamespacedResource; -use parking_lot::RwLock; - -/// IndexPair holds a two indexes and forwards resource updates to both indexes -/// by cloning the update. -pub struct IndexPair { - first: Arc>, - second: Arc>, -} - -impl IndexNamespacedResource for IndexPair -where - A: IndexNamespacedResource, - B: IndexNamespacedResource, - R: Clone, -{ - fn apply(&mut self, resource: R) { - self.first.write().apply(resource.clone()); - self.second.write().apply(resource); - } - - fn delete(&mut self, namespace: String, name: String) { - self.first.write().delete(namespace.clone(), name.clone()); - self.second.write().delete(namespace, name); - } -} - -impl IndexPair { - pub fn shared(first: Arc>, second: Arc>) -> Arc> { - Arc::new(RwLock::new(Self { first, second })) - } -} diff --git a/policy-controller/src/lib.rs b/policy-controller/src/lib.rs index b60fc7edf..5e100ed1b 100644 --- a/policy-controller/src/lib.rs +++ b/policy-controller/src/lib.rs @@ -1,10 +1,8 @@ #![deny(warnings, rust_2018_idioms)] #![forbid(unsafe_code)] mod admission; -mod index_pair; - +pub mod index_list; pub use self::admission::Admission; -pub use self::index_pair::IndexPair; use anyhow::Result; use linkerd_policy_controller_core::inbound::{ DiscoverInboundServer, InboundServer, InboundServerStream, diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index a059b6a2d..a1895b3b7 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -6,8 +6,8 @@ use clap::Parser; use futures::prelude::*; use kube::api::ListParams; use linkerd_policy_controller::{ - grpc, inbound, k8s, outbound, Admission, ClusterInfo, DefaultPolicy, InboundDiscover, - IndexPair, IpNet, OutboundDiscover, + grpc, inbound, index_list::IndexList, k8s, outbound, Admission, ClusterInfo, DefaultPolicy, + InboundDiscover, IpNet, OutboundDiscover, }; use linkerd_policy_controller_k8s_index::ports::parse_portset; use linkerd_policy_controller_k8s_status::{self as status}; @@ -130,13 +130,31 @@ async fn main() -> Result<()> { probe_networks, }); - // Build the index data structure, which will be used to process events from all watches - // The lookup handle is used by the gRPC server. + // Create the lease manager used for trying to claim the policy + // controller write lease. + let api = k8s::Api::namespaced(runtime.client(), &control_plane_namespace); + // todo: Do we need to use LeaseManager::field_manager here? + let lease = kubert::lease::LeaseManager::init(api, LEASE_NAME).await?; + let hostname = + std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable"); + let params = kubert::lease::ClaimParams { + lease_duration: LEASE_DURATION, + renew_grace_period: RENEW_GRACE_PERIOD, + }; + let (claims, _task) = lease.spawn(hostname.clone(), params).await?; + + // Build the API index data structures which will maintain information + // necessary for serving the inbound policy and outbound policy gRPC APIs. let inbound_index = inbound::Index::shared(cluster_info.clone()); let outbound_index = outbound::Index::shared(cluster_info); - let indexes = IndexPair::shared(inbound_index.clone(), outbound_index.clone()); - // Spawn resource indexers that update the index and publish lookups for the gRPC server. + // Build the status index which will maintain information necessary for + // updating the status field of policy resources. + let (updates_tx, updates_rx) = mpsc::unbounded_channel(); + let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx); + + // Spawn resource watches. + let pods = runtime.watch_all::(ListParams::default().labels("linkerd.io/control-plane-ns")); tokio::spawn( @@ -144,8 +162,11 @@ async fn main() -> Result<()> { ); let servers = runtime.watch_all::(ListParams::default()); + let servers_indexes = IndexList::new(inbound_index.clone()) + .push(status_index.clone()) + .shared(); tokio::spawn( - kubert::index::namespaced(inbound_index.clone(), servers).instrument(info_span!("servers")), + kubert::index::namespaced(servers_indexes, servers).instrument(info_span!("servers")), ); let server_authzs = @@ -177,50 +198,24 @@ async fn main() -> Result<()> { ); let http_routes = runtime.watch_all::(ListParams::default()); + let http_routes_indexes = IndexList::new(inbound_index.clone()) + .push(outbound_index.clone()) + .push(status_index.clone()) + .shared(); tokio::spawn( - kubert::index::namespaced(indexes, http_routes).instrument(info_span!("httproutes")), + kubert::index::namespaced(http_routes_indexes, http_routes) + .instrument(info_span!("httproutes")), ); let services = runtime.watch_all::(ListParams::default()); - tokio::spawn( kubert::index::namespaced(outbound_index.clone(), services) .instrument(info_span!("services")), ); - // Create the lease manager used for trying to claim the policy - // controller write lease. - let api = k8s::Api::namespaced(runtime.client(), &control_plane_namespace); - // todo: Do we need to use LeaseManager::field_manager here? - let lease = kubert::lease::LeaseManager::init(api, LEASE_NAME).await?; - let hostname = - std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable"); - let params = kubert::lease::ClaimParams { - lease_duration: LEASE_DURATION, - renew_grace_period: RENEW_GRACE_PERIOD, - }; - let (claims, _task) = lease.spawn(hostname.clone(), params).await?; - - // Build the status Index which will be used to process updates to policy - // resources and send to the status Controller. - let (updates_tx, updates_rx) = mpsc::unbounded_channel(); - let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx); - // Spawn the status Controller reconciliation. tokio::spawn(status::Index::run(status_index.clone()).instrument(info_span!("status::Index"))); - // Spawn resource indexers that update the status Index. - let http_routes = runtime.watch_all::(ListParams::default()); - tokio::spawn( - kubert::index::namespaced(status_index.clone(), http_routes) - .instrument(info_span!("httproutes")), - ); - - let servers = runtime.watch_all::(ListParams::default()); - tokio::spawn( - kubert::index::namespaced(status_index.clone(), servers).instrument(info_span!("servers")), - ); - // Run the gRPC server, serving results by looking up against the index handle. tokio::spawn(grpc( grpc_addr,