Label selector based import setting (#68)
* Use development branch for the time being Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com> * Add support for namespace and cluster label selectors Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com> * Rework FleetAgentConfig and include selectors Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com> * Add e2e including FleetAddonConfig Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com> --------- Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
This commit is contained in:
parent
8001898217
commit
bd03e8c0b0
|
|
@ -23,7 +23,7 @@ jobs:
|
|||
- uses: actions/checkout@v4
|
||||
- name: Test
|
||||
run: just test-unit
|
||||
test-e2e:
|
||||
test-e2e-cluster-class:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Install just
|
||||
|
|
@ -42,5 +42,26 @@ jobs:
|
|||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: artifacts
|
||||
name: artifacts-cluster-class-import
|
||||
path: _out/gather
|
||||
test-e2e-import:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Install just
|
||||
uses: extractions/setup-just@v2
|
||||
- name: Install kind
|
||||
uses: helm/kind-action@v1
|
||||
with:
|
||||
install_only: true
|
||||
- uses: actions/checkout@v4
|
||||
- name: Test
|
||||
run: just test-import
|
||||
- name: Collect artifacts
|
||||
if: always()
|
||||
run: just collect-test-import
|
||||
- name: Store run artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: artifacts-import
|
||||
path: _out/gather
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -34,7 +34,7 @@ actix-web = "4.4.0"
|
|||
futures = "0.3.28"
|
||||
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
|
||||
k8s-openapi = { version = "0.22", features = ["latest", "schemars"] }
|
||||
kube = { version = "0.92", features = ["runtime", "client", "derive"]}
|
||||
kube = { git = "https://github.com/kube-rs/kube.git", features = ["runtime", "client", "derive"]}
|
||||
schemars = { version = "0.8.21", features = ["chrono"] }
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
serde_json = "1.0.117"
|
||||
|
|
|
|||
|
|
@ -22,22 +22,49 @@ spec:
|
|||
description: This provides a config for fleet addon functionality
|
||||
properties:
|
||||
cluster:
|
||||
description: Cluster controller settings
|
||||
description: |-
|
||||
Enable Cluster config funtionality.
|
||||
|
||||
This will create Fleet Cluster for each Cluster with the same name. In case the cluster specifies topology.class, the name of the ClusterClass will be added to the Fleet Cluster labels.
|
||||
nullable: true
|
||||
properties:
|
||||
agent_namespace:
|
||||
agentNamespace:
|
||||
description: Namespace selection for the fleet agent
|
||||
nullable: true
|
||||
type: string
|
||||
enabled:
|
||||
description: |-
|
||||
Enable Cluster config funtionality.
|
||||
|
||||
This will create Fleet Cluster for each Cluster with the same name. In case the cluster specifies topology.class, the name of the ClusterClass will be added to the Fleet Cluster labels.
|
||||
nullable: true
|
||||
type: boolean
|
||||
namespaceSelector:
|
||||
description: 'Namespace label selector. If set, only clusters in the namespace matching label selector will be imported. WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.'
|
||||
properties:
|
||||
matchExpressions:
|
||||
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
|
||||
items:
|
||||
description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values.
|
||||
properties:
|
||||
key:
|
||||
description: key is the label key that the selector applies to.
|
||||
type: string
|
||||
operator:
|
||||
description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.
|
||||
type: string
|
||||
values:
|
||||
description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
required:
|
||||
- key
|
||||
- operator
|
||||
type: object
|
||||
type: array
|
||||
matchLabels:
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
|
||||
type: object
|
||||
type: object
|
||||
naming:
|
||||
description: Naming settings for the fleet cluster
|
||||
nullable: true
|
||||
properties:
|
||||
prefix:
|
||||
description: Specify a prefix for the Cluster name, applied to created Fleet cluster
|
||||
|
|
@ -48,33 +75,64 @@ spec:
|
|||
nullable: true
|
||||
type: string
|
||||
type: object
|
||||
set_owner_references:
|
||||
patchResource:
|
||||
description: Allow to patch resources, maintaining the desired state. If is not set, resources will only be re-created in case of removal.
|
||||
nullable: true
|
||||
type: boolean
|
||||
selector:
|
||||
description: 'Cluster label selector. If set, only clusters matching label selector will be imported. WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.'
|
||||
properties:
|
||||
matchExpressions:
|
||||
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
|
||||
items:
|
||||
description: A label selector requirement is a selector that contains values, a key, and an operator that relates the key and values.
|
||||
properties:
|
||||
key:
|
||||
description: key is the label key that the selector applies to.
|
||||
type: string
|
||||
operator:
|
||||
description: operator represents a key's relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.
|
||||
type: string
|
||||
values:
|
||||
description: values is an array of string values. If the operator is In or NotIn, the values array must be non-empty. If the operator is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.
|
||||
items:
|
||||
type: string
|
||||
type: array
|
||||
required:
|
||||
- key
|
||||
- operator
|
||||
type: object
|
||||
type: array
|
||||
matchLabels:
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
|
||||
type: object
|
||||
type: object
|
||||
setOwnerReferences:
|
||||
description: Setting to disable setting owner references on the created resources
|
||||
nullable: true
|
||||
type: boolean
|
||||
required:
|
||||
- naming
|
||||
- namespaceSelector
|
||||
- selector
|
||||
type: object
|
||||
cluster_class:
|
||||
description: Cluster class controller settings
|
||||
clusterClass:
|
||||
description: |-
|
||||
Enable clusterClass controller functionality.
|
||||
|
||||
This will create Fleet ClusterGroups for each ClusterClaster with the same name.
|
||||
nullable: true
|
||||
properties:
|
||||
enabled:
|
||||
description: |-
|
||||
Enable clusterClass controller functionality.
|
||||
|
||||
This will create Fleet ClusterGroups for each ClusterClaster with the same name.
|
||||
patchResource:
|
||||
description: Allow to patch resources, maintaining the desired state. If is not set, resources will only be re-created in case of removal.
|
||||
nullable: true
|
||||
type: boolean
|
||||
set_owner_references:
|
||||
setOwnerReferences:
|
||||
description: Setting to disable setting owner references on the created resources
|
||||
nullable: true
|
||||
type: boolean
|
||||
type: object
|
||||
patch_resource:
|
||||
description: Allow to patch resources, maintaining the desired state.
|
||||
nullable: true
|
||||
type: boolean
|
||||
type: object
|
||||
required:
|
||||
- spec
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ rules:
|
|||
- namespaces
|
||||
verbs:
|
||||
- list
|
||||
- get
|
||||
- watch
|
||||
- apiGroups:
|
||||
- events.k8s.io
|
||||
resources:
|
||||
|
|
|
|||
10
justfile
10
justfile
|
|
@ -105,11 +105,13 @@ deploy-crs:
|
|||
|
||||
# Deploy child cluster using docker & kubeadm
|
||||
deploy-child-cluster:
|
||||
kubectl --context kind-dev apply -f testdata/config.yaml
|
||||
kubectl --context kind-dev apply -f testdata/cluster_docker_kcp.yaml
|
||||
|
||||
# Deploy child cluster-call based cluster using docker & kubeadm
|
||||
deploy-child-cluster-class:
|
||||
kind delete cluster --name capi-quickstart || true
|
||||
kubectl --context kind-dev apply -f testdata/config.yaml
|
||||
kubectl --context kind-dev apply -f testdata/capi-quickstart.yaml
|
||||
|
||||
# Add and update helm repos used
|
||||
|
|
@ -179,10 +181,10 @@ _test-import-all:
|
|||
just deploy-child-cluster-class
|
||||
just deploy-crs
|
||||
kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces
|
||||
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=jsonpath='{.status.clusterCount}=1' quick-start
|
||||
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=condition=Ready=true quick-start
|
||||
kubectl wait clusters.fleet.cattle.io --timeout=150s --for=condition=Ready=false capi-quickstart
|
||||
kubectl wait clusters.fleet.cattle.io --timeout=150s --for=condition=Ready=true capi-quickstart
|
||||
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=jsonpath='{.status.clusterCount}=1' quick-start -n cluster-class-test
|
||||
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=condition=Ready=true quick-start -n cluster-class-test
|
||||
kubectl wait clusters.fleet.cattle.io --timeout=150s --for=condition=Ready=false capi-quickstart -n cluster-class-test
|
||||
kubectl wait clusters.fleet.cattle.io --timeout=150s --for=condition=Ready=true capi-quickstart -n cluster-class-test
|
||||
|
||||
# Install kopium
|
||||
[private]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,13 @@
|
|||
use kube::CustomResource;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use kube::{
|
||||
core::{ParseExpressionError, Selector},
|
||||
CustomResource,
|
||||
};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const AGENT_NAMESPACE: &str = "fleet-addon-agent";
|
||||
|
||||
/// This provides a config for fleet addon functionality
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
|
|
@ -9,12 +15,18 @@ use serde::{Deserialize, Serialize};
|
|||
group = "addons.cluster.x-k8s.io",
|
||||
version = "v1alpha1"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FleetAddonConfigSpec {
|
||||
/// Allow to patch resources, maintaining the desired state.
|
||||
pub patch_resource: Option<bool>,
|
||||
/// Cluster class controller settings
|
||||
/// Enable clusterClass controller functionality.
|
||||
///
|
||||
/// This will create Fleet ClusterGroups for each ClusterClaster with the same name.
|
||||
pub cluster_class: Option<ClusterClassConfig>,
|
||||
/// Cluster controller settings
|
||||
|
||||
/// Enable Cluster config funtionality.
|
||||
///
|
||||
/// This will create Fleet Cluster for each Cluster with the same name.
|
||||
/// In case the cluster specifies topology.class, the name of the ClusterClass
|
||||
/// will be added to the Fleet Cluster labels.
|
||||
pub cluster: Option<ClusterConfig>,
|
||||
}
|
||||
|
||||
|
|
@ -23,7 +35,6 @@ impl Default for FleetAddonConfig {
|
|||
Self {
|
||||
metadata: Default::default(),
|
||||
spec: FleetAddonConfigSpec {
|
||||
patch_resource: Some(true),
|
||||
cluster_class: Some(ClusterClassConfig::default()),
|
||||
cluster: Some(ClusterConfig::default()),
|
||||
},
|
||||
|
|
@ -32,48 +43,72 @@ impl Default for FleetAddonConfig {
|
|||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClusterClassConfig {
|
||||
/// Enable clusterClass controller functionality.
|
||||
///
|
||||
/// This will create Fleet ClusterGroups for each ClusterClaster with the same name.
|
||||
pub enabled: Option<bool>,
|
||||
|
||||
/// Setting to disable setting owner references on the created resources
|
||||
pub set_owner_references: Option<bool>,
|
||||
|
||||
/// Allow to patch resources, maintaining the desired state.
|
||||
/// If is not set, resources will only be re-created in case of removal.
|
||||
pub patch_resource: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for ClusterClassConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
patch_resource: Some(true),
|
||||
set_owner_references: Some(true),
|
||||
enabled: Some(true),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClusterConfig {
|
||||
/// Enable Cluster config funtionality.
|
||||
///
|
||||
/// This will create Fleet Cluster for each Cluster with the same name.
|
||||
/// In case the cluster specifies topology.class, the name of the ClusterClass
|
||||
/// will be added to the Fleet Cluster labels.
|
||||
pub enabled: Option<bool>,
|
||||
/// Allow to patch resources, maintaining the desired state.
|
||||
/// If is not set, resources will only be re-created in case of removal.
|
||||
pub patch_resource: Option<bool>,
|
||||
|
||||
/// Setting to disable setting owner references on the created resources
|
||||
pub set_owner_references: Option<bool>,
|
||||
|
||||
/// Naming settings for the fleet cluster
|
||||
pub naming: NamingStrategy,
|
||||
pub naming: Option<NamingStrategy>,
|
||||
|
||||
/// Namespace selection for the fleet agent
|
||||
pub agent_namespace: Option<String>,
|
||||
|
||||
/// Import settings for the CAPI cluster. Allows to import clusters based on a set of labels,
|
||||
/// set on the cluster or the namespace.
|
||||
#[serde(flatten)]
|
||||
pub selectors: Selectors,
|
||||
|
||||
// /// Cluster label selector. If set, only clusters matching label selector will be imported.
|
||||
// /// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
|
||||
// pub cluster: Option<LabelSelector>,
|
||||
#[cfg(feature = "agent-initiated")]
|
||||
/// Prepare initial cluster for agent initiated connection
|
||||
pub agent_initiated: Option<bool>,
|
||||
}
|
||||
|
||||
impl ClusterConfig {
|
||||
pub(crate) fn agent_install_namespace(&self) -> String {
|
||||
self.agent_namespace
|
||||
.clone()
|
||||
.unwrap_or(AGENT_NAMESPACE.to_string())
|
||||
}
|
||||
|
||||
#[cfg(feature = "agent-initiated")]
|
||||
pub(crate) fn agent_initiated_connection(&self) -> bool {
|
||||
self.agent_initiated.filter(|&set| set).is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn apply_naming(&self, name: String) -> String {
|
||||
let strategy = self.naming.clone().unwrap_or_default();
|
||||
strategy.apply(name.clone().into()).unwrap_or(name)
|
||||
}
|
||||
}
|
||||
|
||||
/// NamingStrategy is controlling Fleet cluster naming
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)]
|
||||
pub struct NamingStrategy {
|
||||
|
|
@ -88,10 +123,11 @@ impl Default for ClusterConfig {
|
|||
Self {
|
||||
set_owner_references: Some(true),
|
||||
naming: Default::default(),
|
||||
agent_namespace: "fleet-addon-agent".to_string().into(),
|
||||
enabled: Some(true),
|
||||
agent_namespace: AGENT_NAMESPACE.to_string().into(),
|
||||
#[cfg(feature = "agent-initiated")]
|
||||
agent_initiated: Some(true),
|
||||
selectors: Default::default(),
|
||||
patch_resource: Some(true),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -109,6 +145,83 @@ impl NamingStrategy {
|
|||
}
|
||||
}
|
||||
|
||||
/// Selectors is controlling Fleet import strategy settings.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Selectors {
|
||||
/// Namespace label selector. If set, only clusters in the namespace matching label selector will be imported.
|
||||
/// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
|
||||
pub namespace_selector: LabelSelector,
|
||||
|
||||
/// Cluster label selector. If set, only clusters matching label selector will be imported.
|
||||
/// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
|
||||
pub selector: LabelSelector,
|
||||
}
|
||||
|
||||
impl FleetAddonConfig {
|
||||
// Raw cluster selector
|
||||
pub(crate) fn cluster_selector(&self) -> Result<Selector, ParseExpressionError> {
|
||||
self.spec
|
||||
.cluster
|
||||
.as_ref()
|
||||
.map(|c| c.selectors.selector.clone())
|
||||
.unwrap_or_default()
|
||||
.try_into()
|
||||
}
|
||||
|
||||
// Provide a static label selector for cluster objects, which can be always be set
|
||||
// and will not cause cache events from resources in the labeled Namespace to be missed
|
||||
pub(crate) fn cluster_watch(&self) -> Result<Selector, ParseExpressionError> {
|
||||
Ok(self
|
||||
.namespace_selector()?
|
||||
.selects_all()
|
||||
.then_some(self.cluster_selector()?)
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
// Raw namespace selector
|
||||
pub(crate) fn namespace_selector(&self) -> Result<Selector, ParseExpressionError> {
|
||||
self.spec
|
||||
.cluster
|
||||
.as_ref()
|
||||
.map(|c| c.selectors.namespace_selector.clone())
|
||||
.unwrap_or_default()
|
||||
.try_into()
|
||||
}
|
||||
|
||||
// Check for general cluster operations, like create, patch, etc. Evaluates to false if disabled.
|
||||
pub(crate) fn cluster_operations_enabled(&self) -> bool {
|
||||
self.spec.cluster.is_some()
|
||||
}
|
||||
|
||||
// Check for general ClusterClass operations, like create, patch, etc. Evaluates to false if disabled.
|
||||
pub(crate) fn cluster_class_operations_enabled(&self) -> bool {
|
||||
self.spec.cluster_class.is_some()
|
||||
}
|
||||
|
||||
// Check for general cluster patching setting.
|
||||
pub(crate) fn cluster_patch_enabled(&self) -> bool {
|
||||
self.spec
|
||||
.cluster
|
||||
.as_ref()
|
||||
.map(|c| c.patch_resource)
|
||||
.unwrap_or_default()
|
||||
.filter(|&enabled| enabled)
|
||||
.is_some()
|
||||
}
|
||||
|
||||
// Check for general clusterClass patching setting.
|
||||
pub(crate) fn cluster_class_patch_enabled(&self) -> bool {
|
||||
self.spec
|
||||
.cluster
|
||||
.as_ref()
|
||||
.map(|c| c.patch_resource)
|
||||
.unwrap_or_default()
|
||||
.filter(|&enabled| enabled)
|
||||
.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::api::fleet_addon_config::NamingStrategy;
|
||||
|
|
|
|||
|
|
@ -1,13 +1,16 @@
|
|||
use crate::api::capi_cluster::Cluster;
|
||||
use crate::api::capi_clusterclass::ClusterClass;
|
||||
use crate::api::fleet_addon_config::FleetAddonConfig;
|
||||
use crate::api::fleet_cluster;
|
||||
use crate::api::fleet_clustergroup::ClusterGroup;
|
||||
use crate::controllers::controller::{Context, FleetController};
|
||||
use crate::metrics::Diagnostics;
|
||||
use crate::{Error, Metrics};
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
|
||||
use k8s_openapi::api::core::v1::Namespace;
|
||||
use kube::{
|
||||
api::Api,
|
||||
client::Client,
|
||||
|
|
@ -16,6 +19,7 @@ use kube::{
|
|||
watcher::Config,
|
||||
},
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::{sync::RwLock, time::Duration};
|
||||
|
|
@ -70,17 +74,61 @@ pub async fn run_cluster_controller(state: State) {
|
|||
let clusters = Api::<Cluster>::all(client.clone());
|
||||
let fleet = Api::<fleet_cluster::Cluster>::all(client.clone());
|
||||
|
||||
Controller::new(clusters, Config::default().any_semantic())
|
||||
.owns(fleet, Config::default().any_semantic())
|
||||
.shutdown_on_signal()
|
||||
.run(
|
||||
Cluster::reconcile,
|
||||
error_policy,
|
||||
state.to_context(client.clone()),
|
||||
)
|
||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||
.for_each(|_| futures::future::ready(()))
|
||||
.await;
|
||||
let config_api: Api<FleetAddonConfig> = Api::all(client.clone());
|
||||
let config = config_api
|
||||
.get_opt("fleet-addon-config")
|
||||
.await
|
||||
.expect("failed to get FleetAddonConfig resource")
|
||||
.unwrap_or_default();
|
||||
|
||||
let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
|
||||
let clusters = Controller::new(
|
||||
clusters,
|
||||
Config::default()
|
||||
.labels_from(
|
||||
&config
|
||||
.cluster_watch()
|
||||
.expect("valid cluster label selector"),
|
||||
)
|
||||
.any_semantic(),
|
||||
)
|
||||
.owns(fleet, Config::default().any_semantic())
|
||||
.reconcile_all_on(namespace_trigger)
|
||||
.shutdown_on_signal()
|
||||
.run(
|
||||
Cluster::reconcile,
|
||||
error_policy,
|
||||
state.to_context(client.clone()),
|
||||
)
|
||||
.for_each(|_| futures::future::ready(()));
|
||||
|
||||
if config
|
||||
.namespace_selector()
|
||||
.expect("valid namespace selector")
|
||||
.selects_all()
|
||||
{
|
||||
return clusters.await;
|
||||
}
|
||||
|
||||
let ns_controller = Controller::new(
|
||||
Api::<Namespace>::all(client.clone()),
|
||||
Config::default()
|
||||
.labels_from(
|
||||
&config
|
||||
.namespace_selector()
|
||||
.expect("valid namespace selector"),
|
||||
)
|
||||
.any_semantic(),
|
||||
)
|
||||
.shutdown_on_signal()
|
||||
.run(
|
||||
Cluster::reconcile_ns,
|
||||
Cluster::ns_trigger_error_policy,
|
||||
Arc::new(Mutex::new(invoke_reconcile)),
|
||||
)
|
||||
.for_each(|_| futures::future::ready(()));
|
||||
|
||||
tokio::join!(clusters, ns_controller);
|
||||
}
|
||||
|
||||
/// Initialize the controller and shared state (given the crd is installed)
|
||||
|
|
|
|||
|
|
@ -7,18 +7,25 @@ use crate::api::fleet_cluster;
|
|||
use crate::api::fleet_cluster_registration_token::{
|
||||
ClusterRegistrationToken, ClusterRegistrationTokenSpec,
|
||||
};
|
||||
use crate::Result;
|
||||
use crate::{Error, Result};
|
||||
use futures::channel::mpsc::Sender;
|
||||
use k8s_openapi::api::core::v1::Namespace;
|
||||
use kube::api::ObjectMeta;
|
||||
|
||||
use kube::core::SelectorExt as _;
|
||||
use kube::{api::ResourceExt, runtime::controller::Action, Resource};
|
||||
use kube::{Api, Client};
|
||||
#[cfg(feature = "agent-initiated")]
|
||||
use rand::distributions::{Alphanumeric, DistString as _};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::warn;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::cluster_class::CLUSTER_CLASS_LABEL;
|
||||
use super::controller::{get_or_create, patch, Context, FleetBundle, FleetController};
|
||||
use super::{ClusterSyncError, SyncError};
|
||||
use super::{ClusterSyncError, LabelCheckError, SyncError};
|
||||
|
||||
pub static CONTROLPLANE_READY_CONDITION: &str = "ControlPlaneReady";
|
||||
|
||||
|
|
@ -58,26 +65,26 @@ impl Cluster {
|
|||
.set_owner_references
|
||||
.is_some_and(|set| set)
|
||||
.then_some(self.owner_ref(&()).into_iter().collect()),
|
||||
name: config.naming.apply(self.name_any().into()),
|
||||
name: config.apply_naming(self.name_any()).into(),
|
||||
..self.into()
|
||||
},
|
||||
#[cfg(feature = "agent-initiated")]
|
||||
spec: match config.agent_initiated {
|
||||
Some(true) => fleet_cluster::ClusterSpec {
|
||||
spec: match config.agent_initiated_connection() {
|
||||
true => fleet_cluster::ClusterSpec {
|
||||
client_id: Some(Alphanumeric.sample_string(&mut rand::thread_rng(), 64)),
|
||||
agent_namespace: config.agent_namespace,
|
||||
agent_namespace: config.agent_install_namespace().into(),
|
||||
..Default::default()
|
||||
},
|
||||
None | Some(false) => fleet_cluster::ClusterSpec {
|
||||
false => fleet_cluster::ClusterSpec {
|
||||
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
|
||||
agent_namespace: config.agent_namespace,
|
||||
agent_namespace: config.agent_install_namespace().into(),
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
#[cfg(not(feature = "agent-initiated"))]
|
||||
spec: fleet_cluster::ClusterSpec {
|
||||
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
|
||||
agent_namespace: config.agent_namespace,
|
||||
agent_namespace: config.agent_install_namespace().into(),
|
||||
..Default::default()
|
||||
},
|
||||
status: Default::default(),
|
||||
|
|
@ -109,7 +116,7 @@ impl FleetBundle for FleetClusterBundle {
|
|||
.map_err(Into::<ClusterSyncError>::into)
|
||||
.map_err(Into::<SyncError>::into)?;
|
||||
|
||||
if let Some(true) = self.config.spec.patch_resource {
|
||||
if self.config.cluster_patch_enabled() {
|
||||
patch(ctx.clone(), self.fleet.clone())
|
||||
.await
|
||||
.map_err(Into::<ClusterSyncError>::into)
|
||||
|
|
@ -133,14 +140,19 @@ impl FleetBundle for FleetClusterBundle {
|
|||
impl FleetController for Cluster {
|
||||
type Bundle = FleetClusterBundle;
|
||||
|
||||
fn to_bundle(&self, config: &FleetAddonConfig) -> Result<FleetClusterBundle> {
|
||||
config
|
||||
.spec
|
||||
.cluster
|
||||
.iter()
|
||||
.filter_map(|c| c.enabled)
|
||||
.find(|&enabled| enabled)
|
||||
.ok_or(SyncError::EarlyReturn)?;
|
||||
async fn to_bundle(
|
||||
&self,
|
||||
ctx: Arc<Context>,
|
||||
config: &FleetAddonConfig,
|
||||
) -> Result<FleetClusterBundle> {
|
||||
let matching_labels = self
|
||||
.matching_labels(config, ctx.client.clone())
|
||||
.await
|
||||
.map_err(Into::<SyncError>::into)?;
|
||||
|
||||
if !matching_labels || !config.cluster_operations_enabled() {
|
||||
Err(SyncError::EarlyReturn)?;
|
||||
}
|
||||
|
||||
self.cluster_ready().ok_or(SyncError::EarlyReturn)?;
|
||||
|
||||
|
|
@ -166,4 +178,36 @@ impl Cluster {
|
|||
|
||||
ready_condition.or(cp_ready).map(|_| self)
|
||||
}
|
||||
|
||||
pub async fn matching_labels(
|
||||
&self,
|
||||
config: &FleetAddonConfig,
|
||||
client: Client,
|
||||
) -> Result<bool, LabelCheckError> {
|
||||
let matches = config.cluster_selector()?.matches(self.labels()) || {
|
||||
let ns = self.namespace().unwrap_or("default".into());
|
||||
let namespace: Namespace = Api::all(client).get(ns.as_str()).await?;
|
||||
config.namespace_selector()?.matches(namespace.labels())
|
||||
};
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub async fn reconcile_ns(
|
||||
_: Arc<Namespace>,
|
||||
invoke_reconcile: Arc<Mutex<Sender<()>>>,
|
||||
) -> crate::Result<Action> {
|
||||
let mut sender = invoke_reconcile.lock().await;
|
||||
sender.try_send(())?;
|
||||
Ok(Action::await_change())
|
||||
}
|
||||
|
||||
pub fn ns_trigger_error_policy(
|
||||
_: Arc<impl kube::Resource>,
|
||||
error: &Error,
|
||||
_: Arc<Mutex<Sender<()>>>,
|
||||
) -> Action {
|
||||
warn!("triggrer invocation failed: {:?}", error);
|
||||
Action::requeue(Duration::from_secs(5))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ impl FleetBundle for FleetClusterClassBundle {
|
|||
.map_err(Into::<GroupSyncError>::into)
|
||||
.map_err(Into::<SyncError>::into)?;
|
||||
|
||||
if let Some(true) = self.config.spec.patch_resource {
|
||||
if self.config.cluster_class_patch_enabled() {
|
||||
patch(ctx, self.fleet_group.clone())
|
||||
.await
|
||||
.map_err(Into::<GroupSyncError>::into)
|
||||
|
|
@ -73,14 +73,14 @@ impl FleetBundle for FleetClusterClassBundle {
|
|||
impl FleetController for ClusterClass {
|
||||
type Bundle = FleetClusterClassBundle;
|
||||
|
||||
fn to_bundle(&self, config: &FleetAddonConfig) -> Result<FleetClusterClassBundle> {
|
||||
config
|
||||
.spec
|
||||
.cluster_class
|
||||
.iter()
|
||||
.filter_map(|c| c.enabled)
|
||||
.find(|&enabled| enabled)
|
||||
.ok_or(SyncError::EarlyReturn)?;
|
||||
async fn to_bundle(
|
||||
&self,
|
||||
_ctx: Arc<Context>,
|
||||
config: &FleetAddonConfig,
|
||||
) -> Result<FleetClusterClassBundle> {
|
||||
if !config.cluster_class_operations_enabled() {
|
||||
Err(SyncError::EarlyReturn)?;
|
||||
}
|
||||
|
||||
let mut fleet_group: ClusterGroup = self.into();
|
||||
if let Some(ClusterClassConfig {
|
||||
|
|
|
|||
|
|
@ -161,7 +161,9 @@ where
|
|||
|
||||
finalizer(&cluster_api, FLEET_FINALIZER, self, |event| async {
|
||||
let r = match event {
|
||||
finalizer::Event::Apply(c) => c.to_bundle(&config)?.sync(ctx).await,
|
||||
finalizer::Event::Apply(c) => {
|
||||
c.to_bundle(ctx.clone(), &config).await?.sync(ctx).await
|
||||
}
|
||||
finalizer::Event::Cleanup(c) => c.cleanup(ctx).await,
|
||||
};
|
||||
|
||||
|
|
@ -193,5 +195,9 @@ where
|
|||
Ok(Action::await_change())
|
||||
}
|
||||
|
||||
fn to_bundle(&self, config: &FleetAddonConfig) -> crate::Result<Self::Bundle>;
|
||||
async fn to_bundle(
|
||||
&self,
|
||||
ctx: Arc<Context>,
|
||||
config: &FleetAddonConfig,
|
||||
) -> crate::Result<Self::Bundle>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,9 @@ pub enum SyncError {
|
|||
#[error("{0}")]
|
||||
GroupSync(#[from] GroupSyncError),
|
||||
|
||||
#[error("{0}")]
|
||||
LabelCheck(#[from] LabelCheckError),
|
||||
|
||||
#[error("Cluster registration token create error {0}")]
|
||||
ClusterRegistrationTokenSync(#[from] GetOrCreateError),
|
||||
|
||||
|
|
@ -45,6 +48,15 @@ pub enum GetOrCreateError {
|
|||
Event(#[from] kube::Error),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LabelCheckError {
|
||||
#[error("Namespace lookup error: {0}")]
|
||||
NamespaceLookup(#[from] kube::Error),
|
||||
|
||||
#[error("Parse expression error: {0}")]
|
||||
Expression(#[from] kube::core::ParseExpressionError),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PatchError {
|
||||
#[error("Patch error: {0}")]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use controllers::SyncError;
|
||||
use futures::channel::mpsc::TrySendError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
|
|
@ -12,6 +13,9 @@ pub enum Error {
|
|||
#[error("Fleet error: {0}")]
|
||||
FleetError(#[from] SyncError),
|
||||
|
||||
#[error("Namespace trigger error: {0}")]
|
||||
TriggerError(#[from] TrySendError<()>),
|
||||
|
||||
#[error("Finalizer Error: {0}")]
|
||||
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
|
||||
// so boxing this error to break cycles
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
|
||||
use crate::Error;
|
||||
use chrono::{DateTime, Utc};
|
||||
use kube::{runtime::events::{Recorder, Reporter}, Client, Resource, ResourceExt};
|
||||
use kube::{
|
||||
runtime::events::{Recorder, Reporter},
|
||||
Client, Resource, ResourceExt,
|
||||
};
|
||||
use prometheus::{histogram_opts, opts, HistogramVec, IntCounter, IntCounterVec, Registry};
|
||||
use serde::Serialize;
|
||||
use tokio::time::Instant;
|
||||
|
|
|
|||
|
|
@ -1,8 +1,15 @@
|
|||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: cluster-class-test
|
||||
labels:
|
||||
import: ""
|
||||
---
|
||||
apiVersion: cluster.x-k8s.io/v1beta1
|
||||
kind: ClusterClass
|
||||
metadata:
|
||||
name: quick-start
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
controlPlane:
|
||||
machineInfrastructure:
|
||||
|
|
@ -234,7 +241,7 @@ apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
|
|||
kind: DockerClusterTemplate
|
||||
metadata:
|
||||
name: quick-start-cluster
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec: {}
|
||||
|
|
@ -243,7 +250,7 @@ apiVersion: controlplane.cluster.x-k8s.io/v1beta1
|
|||
kind: KubeadmControlPlaneTemplate
|
||||
metadata:
|
||||
name: quick-start-control-plane
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
|
|
@ -264,7 +271,7 @@ apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
|
|||
kind: DockerMachineTemplate
|
||||
metadata:
|
||||
name: quick-start-control-plane
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
|
|
@ -276,7 +283,7 @@ apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
|
|||
kind: DockerMachineTemplate
|
||||
metadata:
|
||||
name: quick-start-default-worker-machinetemplate
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
|
|
@ -288,7 +295,7 @@ apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
|
|||
kind: DockerMachinePoolTemplate
|
||||
metadata:
|
||||
name: quick-start-default-worker-machinepooltemplate
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
|
|
@ -298,7 +305,7 @@ apiVersion: bootstrap.cluster.x-k8s.io/v1beta1
|
|||
kind: KubeadmConfigTemplate
|
||||
metadata:
|
||||
name: quick-start-default-worker-bootstraptemplate
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
|
|
@ -309,7 +316,7 @@ apiVersion: cluster.x-k8s.io/v1beta1
|
|||
kind: Cluster
|
||||
metadata:
|
||||
name: capi-quickstart
|
||||
namespace: default
|
||||
namespace: cluster-class-test
|
||||
labels:
|
||||
cni: kindnet
|
||||
spec:
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ apiVersion: cluster.x-k8s.io/v1beta1
|
|||
kind: Cluster
|
||||
metadata:
|
||||
labels:
|
||||
import: ""
|
||||
cni: kindnet
|
||||
env: dev
|
||||
name: docker-demo
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
apiVersion: addons.cluster.x-k8s.io/v1alpha1
|
||||
kind: FleetAddonConfig
|
||||
metadata:
|
||||
name: fleet-addon-config
|
||||
spec:
|
||||
clusterClass:
|
||||
patchResource: true
|
||||
cluster:
|
||||
patchResource: true
|
||||
selector:
|
||||
matchLabels:
|
||||
import: ""
|
||||
namespaceSelector:
|
||||
matchLabels:
|
||||
import: ""
|
||||
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
apiVersion: networking.k8s.io/v1
|
||||
kind: IngressClass
|
||||
metadata:
|
||||
annotations:
|
||||
ingressclass.kubernetes.io/is-default-class: "true"
|
||||
name: nginx
|
||||
Loading…
Reference in New Issue