Add `BundleNamespaceMapping` generation (#217)

* Add bundleNamespaceMapping generation

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>

* Update the docs on cluster group and bundle namespace mapping

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>

* Refactor to use ref mut when possible

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>

---------

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
This commit is contained in:
Danil Grigorev 2025-03-19 13:14:19 +01:00 committed by GitHub
parent dd2a83d03b
commit 9c46365d26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 622 additions and 275 deletions

View File

@ -31,7 +31,7 @@ jobs:
strategy:
matrix:
include:
- kube_version: "1.30.0"
- kube_version: "1.30.4"
display_name: "stable"
- kube_version: "1.32.0"
display_name: "latest"
@ -64,7 +64,7 @@ jobs:
strategy:
matrix:
include:
- kube_version: "1.30.0"
- kube_version: "1.30.4"
display_name: "stable"
- kube_version: "1.32.0"
display_name: "latest"
@ -97,7 +97,7 @@ jobs:
strategy:
matrix:
include:
- kube_version: "1.30.0"
- kube_version: "1.30.4"
display_name: "stable"
- kube_version: "1.32.0"
display_name: "latest"

View File

@ -100,6 +100,7 @@ rules:
- clusters
- clustergroups
- clusterregistrationtokens
- bundlenamespacemappings
verbs:
- create
- get

View File

@ -1,27 +1,66 @@
# Import strategy
# Import Strategy
CAAPF is following simple import strategy for CAPI clusters.
1. Per each CAPI cluster, there is a Fleet `Cluster` object
2. Per each CAPI Cluster Class there is a Fleet `ClusterGroup` object.
3. There is a default `ClusterGroup` for all `ClusterClasses` in the managmement cluster.
4. There is a default `ClusterGroup` for all CAPI `Clusters` in the management cluster.
5. For each CAPI `Cluster` referencing a `ClusterClass` in a different namespace, a `ClusterGroup` is created in the `Cluster` namespace. This `ClusterGroup` targets all clusters in this namespace, pointing to the same `ClusterClass`.
CAAPF follows a simple import strategy for CAPI clusters:
**By default, `CAAPF` imports all `CAPI` clusters under fleet management. See next section for configuration**
1. Each CAPI cluster has a corresponding Fleet `Cluster` object.
2. Each CAPI Cluster Class has a corresponding Fleet `ClusterGroup` object.
3. When a CAPI `Cluster` references a `ClusterClass` in a different namespace, a `ClusterGroup` is created in the `Cluster` namespace. This `ClusterGroup` targets all clusters in this namespace that reference the same `ClusterClass`. See the [configuration](#cluster-clustergroupbundlenamespacemapping-configuration) section for details.
4. If at least one CAPI `Cluster` references a `ClusterClass` in a different namespace, a [`BundleNamespaceMapping`][mapping] is created in the `ClusterClass` namespace. This allows Fleet `Cluster` resources to use application sources such as `Bundles`, `HelmApps`, or `GitRepos` from the `ClusterClass` namespace as if they were deployed in the `Cluster` namespace. See the [configuration](#cluster-clustergroupbundlenamespacemapping-configuration) section for details.
[mapping]: https://fleet.rancher.io/namespaces#cross-namespace-deployments
**By default, `CAAPF` imports all `CAPI` clusters under Fleet management. See the next section for configuration details.**
![CAAPF-import-groups excalidraw dark](https://github.com/rancher-sandbox/cluster-api-addon-provider-fleet/assets/32226600/0e0bf58d-7030-491e-976e-8363023f0c88)
## Label synchronization
## Label Synchronization
Fleet mainly relies on `Cluster` labels, `Cluster` names and `ClusterGroups` when performing target matching for the desired application or repo content deployment. For that reason `CAAPF` synchronizes labels from the `CAPI` clusters to the imported `Fleet` Cluster resource.
Fleet relies on `Cluster` labels, `Cluster` names, and `ClusterGroups` for target matching when deploying applications or referenced repository content. To ensure consistency, `CAAPF` synchronizes resource labels:
1. From the CAPI `ClusterClass` to the imported Fleet `Cluster` resource.
2. From the CAPI `ClusterClass` to the imported Fleet `ClusterGroup` resource.
When a CAPI `Cluster` references a `ClusterClass`, `CAAPF` applies two specific labels to both the `Cluster` and `ClusterGroup` resources:
- `clusterclass-name.fleet.addons.cluster.x-k8s.io: <class-name>`
- `clusterclass-namespace.fleet.addons.cluster.x-k8s.io: <class-ns>`
## Configuration
`FleetAddonConfig` provides several configuration options to define clusters to import.
`FleetAddonConfig` provides several configuration options to define which clusters to import.
### Cluster `ClusterGroup`/`BundleNamespaceMapping` Configuration
When a CAPI `Cluster` references a `ClusterClass` in a different namespace, a corresponding `ClusterGroup` is created in the **`Cluster`** namespace. This ensures that all clusters within the namespace that share the same `ClusterClass` from another namespace are grouped together.
This `ClusterGroup` inherits `ClusterClass` labels and applies two `CAAPF`-specific labels to uniquely identify the group within the cluster scope:
- `clusterclass-name.fleet.addons.cluster.x-k8s.io: <class-name>`
- `clusterclass-namespace.fleet.addons.cluster.x-k8s.io: <class-ns>`
Additionally, this configuration enables the creation of a `BundleNamespaceMapping`. This mapping selects all available bundles and establishes a link between the namespace of the `Cluster` and the namespace of the referenced `ClusterClass`. This allows the Fleet `Cluster` to be evaluated as a target for application sources such as `Bundles`, `HelmApps`, or `GitRepos` from the **`ClusterClass`** namespace.
When all CAPI `Cluster` resources referencing the same `ClusterClass` are removed, both the `ClusterGroup` and `BundleNamespaceMapping` are cleaned up.
To enable this behavior, configure `FleetAddonConfig` as follows:
```yaml
apiVersion: addons.cluster.x-k8s.io/v1alpha1
kind: FleetAddonConfig
metadata:
name: fleet-addon-config
spec:
cluster:
applyClassGroup: true
```
Setting `applyClassGroup: true` ensures that Fleet automatically creates a `ClusterGroup` object for each `Cluster` resource and applies the necessary `BundleNamespaceMapping` for cross-namespace bundle access.
**Note: If the `cluster` field is not set, this setting is enabled by default.**
### Namespace Label Selection
This section defines how to select namespaces based on specific labels. The `namespaceSelector` field ensures that the import strategy applies only to namespaces that have the label `import: "true"`. This is useful for scoping automatic import to specific namespaces rather than applying it cluster-wide.
This configuration defines how to select namespaces based on specific labels. The `namespaceSelector` field ensures that the import strategy applies only to namespaces that have the label `import: "true"`. This is useful for scoping automatic import to specific namespaces rather than applying it cluster-wide.
```yaml
apiVersion: addons.cluster.x-k8s.io/v1alpha1
@ -37,7 +76,7 @@ spec:
### Cluster Label Selection
This section filters clusters based on labels, ensuring that the FleetAddonConfig applies only to clusters with the label `import: "true"`. This allows more granular per-cluster selection across the cluster scope.
This configuration filters clusters based on labels, ensuring that the `FleetAddonConfig` applies only to clusters with the label `import: "true"`. This allows more granular per-cluster selection across the cluster scope.
```yaml
apiVersion: addons.cluster.x-k8s.io/v1alpha1

View File

@ -90,7 +90,7 @@ start-dev: _cleanup-out-dir _create-out-dir _download-kubectl
kind delete cluster --name dev || true
kind create cluster --image=kindest/node:v{{KUBE_VERSION}} --config testdata/kind-config.yaml
just install-capi
kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces
kubectl wait pods --for=condition=Ready --timeout=300s --all --all-namespaces
# Stop the local dev environment
stop-dev:
@ -101,7 +101,7 @@ deploy-kindnet:
kubectl --context kind-dev apply -f testdata/cni.yaml
deploy-calico:
kubectl --context kind-dev apply -f testdata/helm.yaml
kubectl --context kind-dev apply -f testdata/helm.yaml -n clusterclass
deploy-calico-gitrepo: _download-yq
#!/usr/bin/env bash
@ -179,7 +179,7 @@ collect-test-import:
-just collect-artifacts docker-demo
# Full e2e test of importing cluster in fleet
test-cluster-class-import: start-dev deploy deploy-child-cluster-class deploy-calico deploy-app _test-import-all && collect-test-cluster-class-import
test-cluster-class-import: start-dev deploy deploy-child-cluster-class deploy-calico deploy-app _test-import-all _test-delete-all && collect-test-cluster-class-import
collect-test-cluster-class-import:
-just collect-artifacts dev
@ -192,6 +192,7 @@ test-cluster-class-import-agent-initated: start-dev && collect-test-cluster-clas
just deploy-kindnet
just deploy-app
just _test-import-all
just _test-delete-all
collect-artifacts cluster:
kind get kubeconfig --name {{cluster}} > {{OUT_DIR}}/kubeconfig
@ -200,15 +201,19 @@ collect-artifacts cluster:
# Full e2e test of importing cluster and ClusterClass in fleet
[private]
_test-import-all:
kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces
kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=300s --for=condition=Ready=true quick-start
kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=300s --for=condition=Ready=true quick-start
kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=300s --for=create --for=condition=Ready=true quick-start
# Verify that cluster group created for cluster referencing clusterclass in a different namespace
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=create quick-start.clusterclass
kubectl wait clustergroups.fleet.cattle.io --timeout=150s --for=jsonpath='{.status.clusterCount}=1' quick-start.clusterclass
kubectl wait clustergroups.fleet.cattle.io --timeout=300s --for=condition=Ready=true quick-start.clusterclass
kubectl wait clusters.fleet.cattle.io --timeout=150s --for=create capi-quickstart
kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true capi-quickstart
kubectl wait bundlenamespacemappings.fleet.cattle.io --timeout=300s --for=create -n clusterclass default
kubectl wait clustergroups.fleet.cattle.io --timeout=300s --for=create --for=jsonpath='{.status.clusterCount}=1' --for=condition=Ready=true quick-start.clusterclass
kubectl wait clusters.fleet.cattle.io --timeout=300s --for=create --for=condition=Ready=true capi-quickstart
[private]
_test-delete-all:
# Verify that deleting everything causes full re-import
kubectl delete clustergroups.fleet.cattle.io -n clusterclass quick-start --wait
kubectl delete bundlenamespacemappings.fleet.cattle.io -n clusterclass default --wait
kubectl delete clusters.fleet.cattle.io capi-quickstart --wait
just _test-import-all
# Install kopium
[private]

View File

@ -0,0 +1,114 @@
use std::collections::BTreeMap;
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
mod mapping {
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
#[kube(
kind = "BundleNamespaceMapping",
group = "fleet.cattle.io",
version = "v1alpha1",
namespaced
)]
pub struct BundleNamespaceMappingFantomSpec {}
}
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = mapping::BundleNamespaceMapping)]
#[serde(rename_all = "camelCase")]
pub struct BundleNamespaceMapping {
#[serde(flatten, default)]
pub types: Option<TypeMeta>,
pub metadata: ObjectMeta,
pub bundle_selector: BundleNamespaceMappingBundleSelector,
pub namespace_selector: BundleNamespaceMappingNamespaceSelector,
}
/// A label selector is a label query over a set of resources. The result of matchLabels and
/// matchExpressions are ANDed. An empty label selector matches all objects. A null
/// label selector matches no objects.
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, JsonSchema)]
pub struct BundleNamespaceMappingBundleSelector {
/// matchExpressions is a list of label selector requirements. The requirements are ANDed.
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "matchExpressions"
)]
pub match_expressions: Option<Vec<BundleNamespaceMappingBundleSelectorMatchExpressions>>,
/// matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
/// map is equivalent to an element of matchExpressions, whose key field is "key", the
/// operator is "In", and the values array contains only "value". The requirements are ANDed.
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "matchLabels"
)]
pub match_labels: Option<BTreeMap<String, String>>,
}
/// A label selector requirement is a selector that contains values, a key, and an operator that
/// relates the key and values.
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, JsonSchema)]
pub struct BundleNamespaceMappingBundleSelectorMatchExpressions {
/// key is the label key that the selector applies to.
pub key: String,
/// operator represents a key's relationship to a set of values.
/// Valid operators are In, NotIn, Exists and DoesNotExist.
pub operator: String,
/// values is an array of string values. If the operator is In or NotIn,
/// the values array must be non-empty. If the operator is Exists or DoesNotExist,
/// the values array must be empty. This array is replaced during a strategic
/// merge patch.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<String>>,
}
/// A label selector is a label query over a set of resources. The result of matchLabels and
/// matchExpressions are ANDed. An empty label selector matches all objects. A null
/// label selector matches no objects.
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, JsonSchema)]
pub struct BundleNamespaceMappingNamespaceSelector {
/// matchExpressions is a list of label selector requirements. The requirements are ANDed.
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "matchExpressions"
)]
pub match_expressions: Option<Vec<BundleNamespaceMappingNamespaceSelectorMatchExpressions>>,
/// matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
/// map is equivalent to an element of matchExpressions, whose key field is "key", the
/// operator is "In", and the values array contains only "value". The requirements are ANDed.
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "matchLabels"
)]
pub match_labels: Option<BTreeMap<String, String>>,
}
/// A label selector requirement is a selector that contains values, a key, and an operator that
/// relates the key and values.
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, JsonSchema)]
pub struct BundleNamespaceMappingNamespaceSelectorMatchExpressions {
/// key is the label key that the selector applies to.
pub key: String,
/// operator represents a key's relationship to a set of values.
/// Valid operators are In, NotIn, Exists and DoesNotExist.
pub operator: String,
/// values is an array of string values. If the operator is In or NotIn,
/// the values array must be non-empty. If the operator is Exists or DoesNotExist,
/// the values array must be empty. This array is replaced during a strategic
/// merge patch.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<String>>,
}

View File

@ -1,10 +1,25 @@
use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus};
use std::collections::BTreeMap;
use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus, ClusterTopology};
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
Resource, ResourceExt as _,
};
#[cfg(feature = "agent-initiated")]
use rand::distr::{Alphanumeric, SampleString as _};
use serde::{Deserialize, Serialize};
use super::{
bundle_namespace_mapping::{BundleNamespaceMapping, BundleNamespaceMappingNamespaceSelector},
fleet_addon_config::ClusterConfig,
fleet_cluster,
fleet_clustergroup::{ClusterGroup, CLUSTER_CLASS_LABEL, CLUSTER_CLASS_NAMESPACE_LABEL},
};
#[cfg(feature = "agent-initiated")]
use super::fleet_cluster_registration_token::ClusterRegistrationToken;
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = cluster_api_rs::capi_cluster::Cluster)]
pub struct Cluster {
@ -14,3 +29,185 @@ pub struct Cluster {
pub spec: ClusterSpec,
pub status: Option<ClusterStatus>,
}
impl From<&Cluster> for ObjectMeta {
fn from(cluster: &Cluster) -> Self {
Self {
name: Some(cluster.name_any()),
namespace: cluster.namespace(),
..Default::default()
}
}
}
impl Cluster {
pub(crate) fn to_group(self: &Cluster, config: Option<&ClusterConfig>) -> Option<ClusterGroup> {
config?.apply_class_group().then_some(true)?;
if let cluster_api_rs::capi_cluster::ClusterSpec {
topology:
Some(ClusterTopology {
class_namespace: Some(class_namespace),
class,
..
}),
..
} = &self.spec
{
// Cluster groups creation for cluster class namespace are handled by ClusterClass controller
if Some(class_namespace) == self.namespace().as_ref() {
return None;
}
let labels = {
let mut labels = BTreeMap::default();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), class.clone());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
class_namespace.clone(),
);
Some(labels)
};
return Some(ClusterGroup {
types: Some(TypeMeta::resource::<ClusterGroup>()),
metadata: ObjectMeta {
name: Some(format!("{class}.{class_namespace}")),
namespace: self.namespace(),
labels: labels.clone(),
owner_references: self.owner_ref(&()).into_iter().map(Into::into).collect(),
..Default::default()
},
spec: ClusterGroupSpec {
selector: Some(ClusterGroupSelector {
match_labels: labels,
..Default::default()
}),
},
..Default::default()
});
}
None
}
pub(crate) fn to_cluster(
self: &Cluster,
config: Option<&ClusterConfig>,
) -> fleet_cluster::Cluster {
let empty = ClusterConfig::default();
let config = config.unwrap_or(&empty);
let labels = match &self.spec.topology {
Some(ClusterTopology {
class,
class_namespace,
..
}) if !class.is_empty() => {
let mut labels = self.labels().clone();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), class.clone());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
class_namespace
.clone()
.unwrap_or(self.namespace().unwrap_or_default()),
);
labels
}
None | Some(ClusterTopology { .. }) => self.labels().clone(),
};
fleet_cluster::Cluster {
types: Some(TypeMeta::resource::<fleet_cluster::Cluster>()),
metadata: ObjectMeta {
labels: Some(labels),
owner_references: config
.set_owner_references
.is_some_and(|set| set)
.then_some(self.owner_ref(&()).into_iter().collect()),
name: config.apply_naming(self.name_any()).into(),
..self.into()
},
#[cfg(feature = "agent-initiated")]
spec: match config.agent_initiated_connection() {
true => fleet_api_rs::fleet_cluster::ClusterSpec {
client_id: Some(Alphanumeric.sample_string(&mut rand::rng(), 64)),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars.clone(),
..Default::default()
}
.into(),
false => fleet_api_rs::fleet_cluster::ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars.clone(),
..Default::default()
}
.into(),
},
#[cfg(not(feature = "agent-initiated"))]
spec: fleet_api_rs::fleet_cluster::ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars.clone(),
..Default::default()
},
..Default::default()
}
}
pub(crate) fn to_bundle_ns_mapping(
&self,
config: Option<&ClusterConfig>,
) -> Option<BundleNamespaceMapping> {
config?.apply_class_group().then_some(true)?;
let topology = self.spec.topology.as_ref()?;
let class_namespace = topology.class_namespace.clone()?;
let match_labels = {
let mut labels = BTreeMap::default();
labels.insert("kubernetes.io/metadata.name".into(), self.namespace()?);
Some(labels)
};
Some(BundleNamespaceMapping {
types: Some(TypeMeta::resource::<BundleNamespaceMapping>()),
metadata: ObjectMeta {
name: self.namespace(),
namespace: Some(class_namespace),
..Default::default()
},
bundle_selector: Default::default(),
namespace_selector: BundleNamespaceMappingNamespaceSelector {
match_labels,
..Default::default()
},
})
}
#[cfg(feature = "agent-initiated")]
pub(crate) fn to_cluster_registration_token(
self: &Cluster,
config: Option<&ClusterConfig>,
) -> Option<ClusterRegistrationToken> {
use fleet_api_rs::fleet_cluster_registration_token::ClusterRegistrationTokenSpec;
config?.agent_initiated?.then_some(true)?;
ClusterRegistrationToken {
metadata: self.into(),
spec: ClusterRegistrationTokenSpec {
ttl: Some("1h".into()),
}
.into(),
..Default::default()
}
.into()
}
}

View File

@ -178,6 +178,10 @@ impl ClusterConfig {
let strategy = self.naming.clone().unwrap_or_default();
strategy.apply(name.clone().into()).unwrap_or(name)
}
pub(crate) fn apply_class_group(&self) -> bool {
self.apply_class_group.is_some_and(|enabled| enabled)
}
}
/// NamingStrategy is controlling Fleet cluster naming

View File

@ -1,10 +1,22 @@
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSpec, ClusterGroupStatus};
use std::collections::BTreeMap;
use fleet_api_rs::fleet_clustergroup::{
ClusterGroupSelector, ClusterGroupSpec, ClusterGroupStatus,
};
use k8s_openapi::api::core::v1::ObjectReference;
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
runtime::reflector::ObjectRef,
Resource, ResourceExt as _,
};
use serde::{Deserialize, Serialize};
use super::capi_clusterclass::ClusterClass;
pub static CLUSTER_CLASS_LABEL: &str = "clusterclass-name.fleet.addons.cluster.x-k8s.io";
pub static CLUSTER_CLASS_NAMESPACE_LABEL: &str =
"clusterclass-namespace.fleet.addons.cluster.x-k8s.io";
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_clustergroup::ClusterGroup)]
pub struct ClusterGroup {
@ -14,3 +26,75 @@ pub struct ClusterGroup {
pub spec: ClusterGroupSpec,
pub status: Option<ClusterGroupStatus>,
}
impl ClusterGroup {
pub(crate) fn cluster_class_namespace(&self) -> Option<String> {
self.labels()
.iter()
.find_map(|(key, class_ns)| (*key == CLUSTER_CLASS_NAMESPACE_LABEL).then_some(class_ns))
.cloned()
}
pub(crate) fn cluster_class_name(&self) -> Option<String> {
self.labels()
.iter()
.find_map(|(key, class)| (*key == CLUSTER_CLASS_LABEL).then_some(class))
.cloned()
}
pub(crate) fn cluster_class_ref(&self) -> Option<ObjectReference> {
let name = self.cluster_class_name()?;
let namespace = self.cluster_class_namespace()?;
Some(
ObjectRef::<ClusterClass>::new(&name)
.within(&namespace)
.into(),
)
}
}
impl From<&ClusterClass> for ClusterGroup {
fn from(cluster_class: &ClusterClass) -> Self {
let labels = {
let mut labels = cluster_class.labels().clone();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), cluster_class.name_any());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
cluster_class.namespace().unwrap_or_default(),
);
Some(labels)
};
let match_labels = {
let mut labels = BTreeMap::default();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), cluster_class.name_any());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
cluster_class.namespace().unwrap_or_default(),
);
Some(labels)
};
Self {
types: Some(TypeMeta::resource::<ClusterGroup>()),
metadata: ObjectMeta {
name: Some(cluster_class.name_any()),
namespace: cluster_class.namespace(),
labels,
owner_references: cluster_class
.owner_ref(&())
.into_iter()
.map(Into::into)
.collect(),
..Default::default()
},
spec: ClusterGroupSpec {
selector: Some(ClusterGroupSelector {
match_labels,
..Default::default()
}),
},
..Default::default()
}
}
}

View File

@ -1,3 +1,4 @@
pub mod bundle_namespace_mapping;
pub mod capi_cluster;
pub mod capi_clusterclass;
pub mod fleet_addon_config;

View File

@ -1,3 +1,4 @@
use crate::api::bundle_namespace_mapping::BundleNamespaceMapping;
use crate::api::capi_cluster::Cluster;
use crate::api::capi_clusterclass::ClusterClass;
use crate::api::fleet_addon_config::FleetAddonConfig;
@ -273,7 +274,6 @@ pub async fn run_cluster_controller(state: State) {
)
.for_each(|_| futures::future::ready(()));
let (sub, reader) = state.dispatcher.subscribe();
let fleet = metadata_watcher(
Api::<fleet_cluster::Cluster>::all(client.clone()),
Config::default().any_semantic(),
@ -282,8 +282,27 @@ pub async fn run_cluster_controller(state: State) {
.touched_objects()
.predicate_filter(predicates::resource_version);
let clusters = Controller::for_shared_stream(sub, reader)
let mappings = metadata_watcher(
Api::<BundleNamespaceMapping>::all(client.clone()),
Config::default().any_semantic(),
)
.modify(|g| g.managed_fields_mut().clear())
.touched_objects()
.predicate_filter(predicates::resource_version);
let (sub, reader) = state.dispatcher.subscribe();
let clusters = Controller::for_shared_stream(sub, reader.clone())
.owns_stream(fleet)
.watches_stream(mappings, move |mapping| {
reader
.state()
.into_iter()
.filter_map(move |c: Arc<Cluster>| {
let in_namespace =
c.spec.topology.as_ref()?.class_namespace == mapping.namespace();
in_namespace.then_some(ObjectRef::from_obj(c.deref()))
})
})
.shutdown_on_signal()
.run(
Cluster::reconcile,
@ -332,9 +351,27 @@ pub async fn run_cluster_controller_pre_1_32(state: State) {
.touched_objects()
.predicate_filter(predicates::resource_version);
let mappings = metadata_watcher(
Api::<BundleNamespaceMapping>::all(client.clone()),
Config::default().any_semantic(),
)
.modify(|g| g.managed_fields_mut().clear())
.touched_objects()
.predicate_filter(predicates::resource_version);
let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
let clusters = Controller::for_stream(clusters, reader)
let clusters = Controller::for_stream(clusters, reader.clone())
.owns_stream(fleet)
.watches_stream(mappings, move |mapping| {
reader
.state()
.into_iter()
.filter_map(move |c: Arc<Cluster>| {
let in_namespace =
c.spec.topology.as_ref()?.class_namespace == mapping.namespace();
in_namespace.then_some(ObjectRef::from_obj(c.deref()))
})
})
.reconcile_all_on(namespace_trigger)
.shutdown_on_signal()
.run(

View File

@ -1,19 +1,17 @@
use crate::api::bundle_namespace_mapping::BundleNamespaceMapping;
use crate::api::capi_cluster::Cluster;
use crate::api::fleet_addon_config::{ClusterConfig, FleetAddonConfig};
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::api::fleet_cluster::{self};
#[cfg(feature = "agent-initiated")]
use crate::api::fleet_cluster_registration_token::ClusterRegistrationToken;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::Error;
use cluster_api_rs::capi_cluster::ClusterTopology;
use fleet_api_rs::fleet_cluster::ClusterSpec;
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec};
use futures::channel::mpsc::Sender;
use futures::StreamExt as _;
use k8s_openapi::api::core::v1::Namespace;
use kube::api::{ApiResource, Object, ObjectMeta, PatchParams, TypeMeta};
use kube::api::{ApiResource, Object, PatchParams};
use kube::core::SelectorExt as _;
use kube::runtime::watcher::{self, Config};
@ -26,11 +24,9 @@ use serde_json::Value;
use tokio::sync::Mutex;
use tracing::{info, warn};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use super::cluster_class::{CLUSTER_CLASS_LABEL, CLUSTER_CLASS_NAMESPACE_LABEL};
use super::controller::{
fetch_config, get_or_create, patch, Context, FleetBundle, FleetController,
};
@ -42,6 +38,7 @@ pub struct FleetClusterBundle {
template_sources: TemplateSources,
fleet: fleet_cluster::Cluster,
fleet_group: Option<ClusterGroup>,
mapping: Option<BundleNamespaceMapping>,
#[cfg(feature = "agent-initiated")]
cluster_registration_token: Option<ClusterRegistrationToken>,
config: FleetAddonConfig,
@ -97,171 +94,33 @@ impl TemplateSources {
}
}
impl From<&Cluster> for ObjectMeta {
fn from(cluster: &Cluster) -> Self {
Self {
name: Some(cluster.name_any()),
namespace: cluster.meta().namespace.clone(),
..Default::default()
}
}
}
impl Cluster {
fn to_group(self: &Cluster, config: Option<ClusterConfig>) -> Option<ClusterGroup> {
if let Some(ClusterConfig {
apply_class_group: Some(true),
..
}) = config
{
} else {
return None;
};
if let cluster_api_rs::capi_cluster::ClusterSpec {
topology:
Some(ClusterTopology {
class_namespace: Some(class_namespace),
class,
..
}),
..
} = &self.spec
{
// Cluster groups creation for cluster class namespace are handled by ClusterClass controller
if Some(class_namespace) == self.namespace().as_ref() {
return None;
}
let labels = {
let mut labels = BTreeMap::default();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), class.clone());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
class_namespace.clone(),
);
Some(labels)
};
return Some(ClusterGroup {
types: Some(TypeMeta::resource::<ClusterGroup>()),
metadata: ObjectMeta {
name: Some(format!("{class}.{class_namespace}")),
namespace: self.namespace(),
labels: labels.clone(),
owner_references: self.owner_ref(&()).into_iter().map(Into::into).collect(),
..Default::default()
},
spec: ClusterGroupSpec {
selector: Some(ClusterGroupSelector {
match_labels: labels,
..Default::default()
}),
},
..Default::default()
});
}
None
}
fn to_cluster(self: &Cluster, config: Option<ClusterConfig>) -> fleet_cluster::Cluster {
let config = config.unwrap_or_default();
let labels = match &self.spec.topology {
Some(ClusterTopology {
class,
class_namespace,
..
}) if !class.is_empty() => {
let mut labels = self.labels().clone();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), class.clone());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
class_namespace
.clone()
.unwrap_or(self.namespace().unwrap_or_default()),
);
labels
}
None | Some(ClusterTopology { .. }) => self.labels().clone(),
};
fleet_cluster::Cluster {
types: Some(TypeMeta::resource::<fleet_cluster::Cluster>()),
metadata: ObjectMeta {
labels: Some(labels),
owner_references: config
.set_owner_references
.is_some_and(|set| set)
.then_some(self.owner_ref(&()).into_iter().collect()),
name: config.apply_naming(self.name_any()).into(),
..self.into()
},
#[cfg(feature = "agent-initiated")]
spec: match config.agent_initiated_connection() {
true => ClusterSpec {
client_id: Some(Alphanumeric.sample_string(&mut rand::rng(), 64)),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
..Default::default()
}
.into(),
false => ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
..Default::default()
}
.into(),
},
#[cfg(not(feature = "agent-initiated"))]
spec: ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
..Default::default()
},
..Default::default()
}
}
#[cfg(feature = "agent-initiated")]
fn to_cluster_registration_token(
self: &Cluster,
config: Option<ClusterConfig>,
) -> Option<ClusterRegistrationToken> {
use fleet_api_rs::fleet_cluster_registration_token::ClusterRegistrationTokenSpec;
config?.agent_initiated?.then_some(true)?;
ClusterRegistrationToken {
metadata: self.into(),
spec: ClusterRegistrationTokenSpec {
ttl: Some("1h".into()),
}
.into(),
..Default::default()
}
.into()
}
}
impl FleetBundle for FleetClusterBundle {
#[allow(refining_impl_trait)]
async fn sync(&self, ctx: Arc<Context>) -> ClusterSyncResult<Action> {
let mut cluster = self.fleet.clone();
async fn sync(&mut self, ctx: Arc<Context>) -> ClusterSyncResult<Action> {
let cluster = &mut self.fleet;
if let Some(template) = self.template_sources.resolve(ctx.client.clone()).await {
let template = serde_json::from_value(template)?;
cluster.spec.template_values = Some(template);
}
if let Some(mapping) = self.mapping.as_mut() {
if self.config.cluster_patch_enabled() {
let cluster_name = cluster.name_any();
patch(
ctx.clone(),
mapping,
&PatchParams::apply(&format!("cluster-{cluster_name}-addon-provider-fleet")),
)
.await
.map_err(ClusterSyncError::BundleNamespaceMappingError)?;
let class_namespace = mapping.namespace().unwrap_or_default();
let cluster_namespace = mapping.name_any();
info!("Updated BundleNamespaceMapping for cluster {cluster_name} between class namespace: {class_namespace} and cluster namespace: {cluster_namespace}")
};
}
match self.config.cluster_patch_enabled() {
true => {
patch(
@ -275,11 +134,11 @@ impl FleetBundle for FleetClusterBundle {
};
#[cfg(feature = "agent-initiated")]
if let Some(cluster_registration_token) = self.cluster_registration_token.clone() {
if let Some(cluster_registration_token) = self.cluster_registration_token.as_ref() {
get_or_create(ctx.clone(), cluster_registration_token).await?;
}
if let Some(group) = self.fleet_group.clone() {
if let Some(group) = self.fleet_group.as_mut() {
let cluster_name = self.fleet.name_any();
if self.config.cluster_patch_enabled() {
patch(
@ -300,7 +159,7 @@ impl FleetController for Cluster {
type Bundle = FleetClusterBundle;
async fn to_bundle(&self, ctx: Arc<Context>) -> BundleResult<Option<FleetClusterBundle>> {
let config = fetch_config(ctx.clone().client.clone()).await?;
let config = fetch_config(ctx.client.clone()).await?;
if ctx.version < 32 && !self.matching_labels(&config, ctx.client.clone()).await? {
return Ok(None);
@ -316,11 +175,12 @@ impl FleetController for Cluster {
Ok(Some(FleetClusterBundle {
template_sources: TemplateSources::new(self),
fleet: self.to_cluster(config.spec.cluster.clone()),
fleet_group: self.to_group(config.spec.cluster.clone()),
fleet: self.to_cluster(config.spec.cluster.as_ref()),
fleet_group: self.to_group(config.spec.cluster.as_ref()),
mapping: self.to_bundle_ns_mapping(config.spec.cluster.as_ref()),
#[cfg(feature = "agent-initiated")]
cluster_registration_token: self
.to_cluster_registration_token(config.spec.cluster.clone()),
.to_cluster_registration_token(config.spec.cluster.as_ref()),
config,
}))
}

View File

@ -3,12 +3,10 @@ use crate::api::capi_clusterclass::ClusterClass;
use crate::api::fleet_addon_config::{ClusterClassConfig, FleetAddonConfig};
use crate::api::fleet_clustergroup::ClusterGroup;
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec};
use kube::api::{ObjectMeta, PatchParams, TypeMeta};
use kube::api::PatchParams;
use kube::{api::ResourceExt, runtime::controller::Action, Resource};
use kube::runtime::controller::Action;
use std::collections::BTreeMap;
use std::sync::Arc;
use super::controller::{
@ -16,63 +14,24 @@ use super::controller::{
};
use super::{BundleResult, GroupSyncResult};
pub static CLUSTER_CLASS_LABEL: &str = "clusterclass-name.fleet.addons.cluster.x-k8s.io";
pub static CLUSTER_CLASS_NAMESPACE_LABEL: &str =
"clusterclass-namespace.fleet.addons.cluster.x-k8s.io";
pub struct FleetClusterClassBundle {
fleet_group: ClusterGroup,
config: FleetAddonConfig,
}
impl From<&ClusterClass> for ClusterGroup {
fn from(cluster_class: &ClusterClass) -> Self {
let labels = {
let mut labels = BTreeMap::default();
labels.insert(CLUSTER_CLASS_LABEL.to_string(), cluster_class.name_any());
labels.insert(
CLUSTER_CLASS_NAMESPACE_LABEL.to_string(),
cluster_class.namespace().unwrap_or_default(),
);
Some(labels)
};
Self {
types: Some(TypeMeta::resource::<ClusterGroup>()),
metadata: ObjectMeta {
name: Some(cluster_class.name_any()),
namespace: cluster_class.meta().namespace.clone(),
labels: labels.clone(),
owner_references: cluster_class
.owner_ref(&())
.into_iter()
.map(Into::into)
.collect(),
..Default::default()
},
spec: ClusterGroupSpec {
selector: Some(ClusterGroupSelector {
match_labels: labels,
..Default::default()
}),
},
..Default::default()
}
}
}
impl FleetBundle for FleetClusterClassBundle {
#[allow(refining_impl_trait)]
async fn sync(&self, ctx: Arc<Context>) -> GroupSyncResult<Action> {
async fn sync(&mut self, ctx: Arc<Context>) -> GroupSyncResult<Action> {
match self.config.cluster_class_patch_enabled() {
true => {
patch(
ctx,
self.fleet_group.clone(),
&mut self.fleet_group,
&PatchParams::apply("addon-provider-fleet"),
)
.await?
}
false => get_or_create(ctx.clone(), self.fleet_group.clone()).await?,
false => get_or_create(ctx.clone(), &self.fleet_group).await?,
};
Ok(Action::await_change())
@ -83,7 +42,7 @@ impl FleetController for ClusterClass {
type Bundle = FleetClusterClassBundle;
async fn to_bundle(&self, ctx: Arc<Context>) -> BundleResult<Option<FleetClusterClassBundle>> {
let config = fetch_config(ctx.clone().client.clone()).await?;
let config = fetch_config(ctx.client.clone()).await?;
if !config.cluster_class_operations_enabled() {
return Ok(None);
}

View File

@ -1,7 +1,10 @@
use crate::api::bundle_namespace_mapping::BundleNamespaceMapping;
use crate::api::fleet_clustergroup::ClusterGroup;
use cluster_api_rs::capi_clusterclass::ClusterClass;
use kube::api::PatchParams;
use kube::runtime::controller::Action;
use kube::{Api, ResourceExt};
use std::sync::Arc;
@ -11,14 +14,38 @@ use super::{BundleResult, GroupSyncResult};
impl FleetBundle for ClusterGroup {
// Applies finalizer on the existing ClusterGroup object, so the deletion event is not missed
#[allow(refining_impl_trait)]
async fn sync(&self, ctx: Arc<Context>) -> GroupSyncResult<Action> {
patch(
ctx.clone(),
self.clone(),
&PatchParams::apply("addon-provider-fleet"),
)
.await?;
async fn sync(&mut self, ctx: Arc<Context>) -> GroupSyncResult<Action> {
if let Some(cc_ref) = self.cluster_class_ref() {
let class = ctx.client.fetch::<ClusterClass>(&cc_ref).await?;
self.labels_mut().extend(
class
.labels()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string())),
);
patch(
ctx.clone(),
self,
&PatchParams::apply("addon-provider-fleet"),
)
.await?;
}
Ok(Action::await_change())
}
async fn cleanup(&mut self, ctx: Arc<Context>) -> Result<Action, super::SyncError> {
let class_ns = self.cluster_class_namespace();
let namespace = self.namespace();
if class_ns.is_some() && class_ns != namespace {
let api = Api::<BundleNamespaceMapping>::namespaced(
ctx.client.clone(),
&class_ns.unwrap_or_default(),
);
api.delete(&namespace.unwrap_or_default(), &Default::default())
.await?;
}
Ok(Action::await_change())
}
}

View File

@ -51,18 +51,14 @@ pub struct Context {
pub version: u32,
}
pub(crate) async fn get_or_create<R>(ctx: Arc<Context>, res: R) -> GetOrCreateResult<Action>
pub(crate) async fn get_or_create<R>(ctx: Arc<Context>, res: &R) -> GetOrCreateResult<Action>
where
R: std::fmt::Debug,
R: Clone + Serialize + DeserializeOwned,
R: kube::Resource<DynamicType = (), Scope = NamespaceResourceScope>,
R: kube::ResourceExt,
{
let ns = res
.meta()
.namespace
.clone()
.unwrap_or(String::from("default"));
let ns = res.namespace().unwrap_or(String::from("default"));
let api = Api::namespaced(ctx.client.clone(), &ns);
let obj = api
@ -74,7 +70,7 @@ where
return Ok(Action::await_change());
}
api.create(&PostParams::default(), &res)
api.create(&PostParams::default(), res)
.await
.map_err(GetOrCreateError::Create)?;
@ -103,24 +99,19 @@ where
Ok(Action::await_change())
}
pub(crate) async fn patch<R>(ctx: Arc<Context>, res: R, pp: &PatchParams) -> PatchResult<Action>
pub(crate) async fn patch<R>(ctx: Arc<Context>, res: &mut R, pp: &PatchParams) -> PatchResult<Action>
where
R: std::fmt::Debug,
R: Clone + Serialize + DeserializeOwned,
R: kube::Resource<DynamicType = (), Scope = NamespaceResourceScope>,
R: kube::ResourceExt,
{
let ns = res
.meta()
.namespace
.clone()
.unwrap_or(String::from("default"));
let ns = res.namespace().unwrap_or(String::from("default"));
let api: Api<R> = Api::namespaced(ctx.client.clone(), &ns);
let mut res = res.clone();
res.meta_mut().managed_fields = None;
api.patch(&res.name_any(), pp, &Patch::Apply(res.clone()))
api.patch(&res.name_any(), pp, &Patch::Apply(&res))
.await
.map_err(PatchError::Patch)?;
@ -157,7 +148,10 @@ pub(crate) async fn fetch_config(client: Client) -> ConfigFetchResult<FleetAddon
}
pub(crate) trait FleetBundle {
async fn sync(&self, ctx: Arc<Context>) -> Result<Action, impl Into<SyncError>>;
async fn sync(&mut self, ctx: Arc<Context>) -> Result<Action, impl Into<SyncError>>;
async fn cleanup(&mut self, _ctx: Arc<Context>) -> Result<Action, SyncError> {
Ok(Action::await_change())
}
}
pub(crate) trait FleetController
@ -180,7 +174,7 @@ where
finalizer(&api, FLEET_FINALIZER, self, |event| async {
match event {
finalizer::Event::Apply(c) => match c.to_bundle(ctx.clone()).await? {
Some(bundle) => bundle
Some(mut bundle) => bundle
.sync(ctx)
.await
.map_err(Into::into)
@ -212,6 +206,10 @@ where
)
.await?;
if let Some(mut bundle) = self.to_bundle(ctx.clone()).await? {
return Ok(bundle.cleanup(ctx).await?);
}
Ok(Action::await_change())
}

View File

@ -13,6 +13,9 @@ pub enum SyncError {
#[error("Cluster registration token create error {0}")]
ClusterRegistrationTokenSync(#[from] GetOrCreateError),
#[error("BundleNamespaceMapping delete error: {0}")]
BundleNsMappingDelete(#[from] kube::Error),
}
pub type ClusterSyncResult<T, E = ClusterSyncError> = std::result::Result<T, E>;
@ -28,6 +31,12 @@ pub enum ClusterSyncError {
#[error("Cluster group update error: {0}")]
GroupPatchError(#[source] PatchError),
#[error("Cluster BundleNamespaceMapping update error: {0}")]
BundleNamespaceMappingError(#[source] PatchError),
#[error("Cluster BundleNamespaceMapping lookup error")]
MappingLookupError(#[from] kube::Error),
#[error("Cluster json encoding error: {0}")]
ClusterEncodeError(#[from] serde_json::Error),
}
@ -41,6 +50,9 @@ pub enum GroupSyncError {
#[error("Cluster group update error: {0}")]
PatchError(#[from] PatchError),
#[error("Unable to find origin ClusterClass for the ClusterGroup: {0}")]
ClassLookup(#[from] kube::Error),
}
pub type GetOrCreateResult<T, E = GetOrCreateError> = std::result::Result<T, E>;
@ -88,6 +100,15 @@ pub enum BundleError {
#[error("{0}")]
Config(#[from] ConfigFetchError),
#[error("BundleNamespaceMapping creating error: {0}")]
Mapping(#[from] BundleMappingError),
}
#[derive(Error, Debug)]
pub enum BundleMappingError {
#[error("ClusterClass lookup error: {0}")]
ClusterClassLookup(#[from] kube::Error),
}
pub type ConfigFetchResult<T> = std::result::Result<T, ConfigFetchError>;