mirror of https://github.com/linkerd/linkerd2.git
Add index_list to remove redundant kubert watches (#10548)
The policy controller has several indexes: the inbound index, the outbound index, and the status index, each for different purposes, but each needing an overlapping set of Kubert resource watches. This has led to having duplicated kubert watches for the same type when it is needed by multiple indexes (e.g. all 3 indexes need to watch HttpRoute). We previously introduced a IndexPair type which fans out watch updates to 2 indexes, but this is no longer sufficient now that we have 3 indexes. We replace IndexPair with IndexList which can hold any number of indexes for a resource. We reorganize the policy controller's main to establish only one kubert watch for each resource and then use IndexList to fan updates out to the relevant indexes. Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
1eb868cb6a
commit
221e233f3d
|
|
@ -0,0 +1,59 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use kubert::index::IndexNamespacedResource;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
/// A list of indexes for a specific resource type.
|
||||
///
|
||||
/// An `IndexList` itself can then act as an index for that resource, and fans updates
|
||||
/// out to each index in the list by cloning the update.
|
||||
pub struct IndexList<A, T = A> {
|
||||
index: Arc<RwLock<A>>,
|
||||
tail: Option<T>,
|
||||
}
|
||||
|
||||
impl<A, T, R> IndexNamespacedResource<R> for IndexList<A, T>
|
||||
where
|
||||
A: IndexNamespacedResource<R>,
|
||||
T: IndexNamespacedResource<R>,
|
||||
R: Clone,
|
||||
{
|
||||
fn apply(&mut self, resource: R) {
|
||||
if let Some(tail) = &mut self.tail {
|
||||
tail.apply(resource.clone());
|
||||
}
|
||||
self.index.write().apply(resource);
|
||||
}
|
||||
|
||||
fn delete(&mut self, namespace: String, name: String) {
|
||||
if let Some(tail) = &mut self.tail {
|
||||
tail.delete(namespace.clone(), name.clone());
|
||||
}
|
||||
self.index.write().delete(namespace, name);
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, T> IndexList<A, T> {
|
||||
pub fn push<B>(self, index: Arc<RwLock<B>>) -> IndexList<B, IndexList<A, T>> {
|
||||
IndexList {
|
||||
index,
|
||||
tail: Some(self),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shared(self) -> Arc<RwLock<Self>> {
|
||||
Arc::new(RwLock::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> IndexList<A> {
|
||||
/// Returns a new `IndexList`.
|
||||
///
|
||||
/// The second type parameter in the return value here can be anything that
|
||||
/// implements `IndexNamespacedResource<R>`, since it will just be `None`.
|
||||
/// Ideally, the type should be `!` (bottom) but `A` is conveniently available,
|
||||
/// so we use that.
|
||||
pub fn new(index: Arc<RwLock<A>>) -> IndexList<A, A> {
|
||||
IndexList { index, tail: None }
|
||||
}
|
||||
}
|
||||
|
|
@ -1,34 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use kubert::index::IndexNamespacedResource;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
/// IndexPair holds a two indexes and forwards resource updates to both indexes
|
||||
/// by cloning the update.
|
||||
pub struct IndexPair<A, B> {
|
||||
first: Arc<RwLock<A>>,
|
||||
second: Arc<RwLock<B>>,
|
||||
}
|
||||
|
||||
impl<A, B, R> IndexNamespacedResource<R> for IndexPair<A, B>
|
||||
where
|
||||
A: IndexNamespacedResource<R>,
|
||||
B: IndexNamespacedResource<R>,
|
||||
R: Clone,
|
||||
{
|
||||
fn apply(&mut self, resource: R) {
|
||||
self.first.write().apply(resource.clone());
|
||||
self.second.write().apply(resource);
|
||||
}
|
||||
|
||||
fn delete(&mut self, namespace: String, name: String) {
|
||||
self.first.write().delete(namespace.clone(), name.clone());
|
||||
self.second.write().delete(namespace, name);
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> IndexPair<A, B> {
|
||||
pub fn shared(first: Arc<RwLock<A>>, second: Arc<RwLock<B>>) -> Arc<RwLock<Self>> {
|
||||
Arc::new(RwLock::new(Self { first, second }))
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,8 @@
|
|||
#![deny(warnings, rust_2018_idioms)]
|
||||
#![forbid(unsafe_code)]
|
||||
mod admission;
|
||||
mod index_pair;
|
||||
|
||||
pub mod index_list;
|
||||
pub use self::admission::Admission;
|
||||
pub use self::index_pair::IndexPair;
|
||||
use anyhow::Result;
|
||||
use linkerd_policy_controller_core::inbound::{
|
||||
DiscoverInboundServer, InboundServer, InboundServerStream,
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ use clap::Parser;
|
|||
use futures::prelude::*;
|
||||
use kube::api::ListParams;
|
||||
use linkerd_policy_controller::{
|
||||
grpc, inbound, k8s, outbound, Admission, ClusterInfo, DefaultPolicy, InboundDiscover,
|
||||
IndexPair, IpNet, OutboundDiscover,
|
||||
grpc, inbound, index_list::IndexList, k8s, outbound, Admission, ClusterInfo, DefaultPolicy,
|
||||
InboundDiscover, IpNet, OutboundDiscover,
|
||||
};
|
||||
use linkerd_policy_controller_k8s_index::ports::parse_portset;
|
||||
use linkerd_policy_controller_k8s_status::{self as status};
|
||||
|
|
@ -130,13 +130,31 @@ async fn main() -> Result<()> {
|
|||
probe_networks,
|
||||
});
|
||||
|
||||
// Build the index data structure, which will be used to process events from all watches
|
||||
// The lookup handle is used by the gRPC server.
|
||||
// Create the lease manager used for trying to claim the policy
|
||||
// controller write lease.
|
||||
let api = k8s::Api::namespaced(runtime.client(), &control_plane_namespace);
|
||||
// todo: Do we need to use LeaseManager::field_manager here?
|
||||
let lease = kubert::lease::LeaseManager::init(api, LEASE_NAME).await?;
|
||||
let hostname =
|
||||
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
|
||||
let params = kubert::lease::ClaimParams {
|
||||
lease_duration: LEASE_DURATION,
|
||||
renew_grace_period: RENEW_GRACE_PERIOD,
|
||||
};
|
||||
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;
|
||||
|
||||
// Build the API index data structures which will maintain information
|
||||
// necessary for serving the inbound policy and outbound policy gRPC APIs.
|
||||
let inbound_index = inbound::Index::shared(cluster_info.clone());
|
||||
let outbound_index = outbound::Index::shared(cluster_info);
|
||||
let indexes = IndexPair::shared(inbound_index.clone(), outbound_index.clone());
|
||||
|
||||
// Spawn resource indexers that update the index and publish lookups for the gRPC server.
|
||||
// Build the status index which will maintain information necessary for
|
||||
// updating the status field of policy resources.
|
||||
let (updates_tx, updates_rx) = mpsc::unbounded_channel();
|
||||
let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx);
|
||||
|
||||
// Spawn resource watches.
|
||||
|
||||
let pods =
|
||||
runtime.watch_all::<k8s::Pod>(ListParams::default().labels("linkerd.io/control-plane-ns"));
|
||||
tokio::spawn(
|
||||
|
|
@ -144,8 +162,11 @@ async fn main() -> Result<()> {
|
|||
);
|
||||
|
||||
let servers = runtime.watch_all::<k8s::policy::Server>(ListParams::default());
|
||||
let servers_indexes = IndexList::new(inbound_index.clone())
|
||||
.push(status_index.clone())
|
||||
.shared();
|
||||
tokio::spawn(
|
||||
kubert::index::namespaced(inbound_index.clone(), servers).instrument(info_span!("servers")),
|
||||
kubert::index::namespaced(servers_indexes, servers).instrument(info_span!("servers")),
|
||||
);
|
||||
|
||||
let server_authzs =
|
||||
|
|
@ -177,50 +198,24 @@ async fn main() -> Result<()> {
|
|||
);
|
||||
|
||||
let http_routes = runtime.watch_all::<k8s::policy::HttpRoute>(ListParams::default());
|
||||
let http_routes_indexes = IndexList::new(inbound_index.clone())
|
||||
.push(outbound_index.clone())
|
||||
.push(status_index.clone())
|
||||
.shared();
|
||||
tokio::spawn(
|
||||
kubert::index::namespaced(indexes, http_routes).instrument(info_span!("httproutes")),
|
||||
kubert::index::namespaced(http_routes_indexes, http_routes)
|
||||
.instrument(info_span!("httproutes")),
|
||||
);
|
||||
|
||||
let services = runtime.watch_all::<k8s::Service>(ListParams::default());
|
||||
|
||||
tokio::spawn(
|
||||
kubert::index::namespaced(outbound_index.clone(), services)
|
||||
.instrument(info_span!("services")),
|
||||
);
|
||||
|
||||
// Create the lease manager used for trying to claim the policy
|
||||
// controller write lease.
|
||||
let api = k8s::Api::namespaced(runtime.client(), &control_plane_namespace);
|
||||
// todo: Do we need to use LeaseManager::field_manager here?
|
||||
let lease = kubert::lease::LeaseManager::init(api, LEASE_NAME).await?;
|
||||
let hostname =
|
||||
std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable");
|
||||
let params = kubert::lease::ClaimParams {
|
||||
lease_duration: LEASE_DURATION,
|
||||
renew_grace_period: RENEW_GRACE_PERIOD,
|
||||
};
|
||||
let (claims, _task) = lease.spawn(hostname.clone(), params).await?;
|
||||
|
||||
// Build the status Index which will be used to process updates to policy
|
||||
// resources and send to the status Controller.
|
||||
let (updates_tx, updates_rx) = mpsc::unbounded_channel();
|
||||
let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx);
|
||||
|
||||
// Spawn the status Controller reconciliation.
|
||||
tokio::spawn(status::Index::run(status_index.clone()).instrument(info_span!("status::Index")));
|
||||
|
||||
// Spawn resource indexers that update the status Index.
|
||||
let http_routes = runtime.watch_all::<k8s::policy::HttpRoute>(ListParams::default());
|
||||
tokio::spawn(
|
||||
kubert::index::namespaced(status_index.clone(), http_routes)
|
||||
.instrument(info_span!("httproutes")),
|
||||
);
|
||||
|
||||
let servers = runtime.watch_all::<k8s::policy::Server>(ListParams::default());
|
||||
tokio::spawn(
|
||||
kubert::index::namespaced(status_index.clone(), servers).instrument(info_span!("servers")),
|
||||
);
|
||||
|
||||
// Run the gRPC server, serving results by looking up against the index handle.
|
||||
tokio::spawn(grpc(
|
||||
grpc_addr,
|
||||
|
|
|
|||
Loading…
Reference in New Issue