Note: For this setup to work, you need to install Fleet and Fleet CRDs charts via
-`FleetAddonConfig` resource. Both need to have version version >= v0.12.0-alpha.14, which provides support
+`FleetAddonConfig` resource. Both need to have version >= v0.12.0-beta.1,
+which provides support for `HelmApp` resource.
@@ -13,7 +14,7 @@ In this tutorial we will deploy `Calico` CNI using `HelmApp` resource and `Fleet
Here's an example of how a `HelmApp` resource can be used in combination with templateValues to deploy application consistently on any matching cluster.
-In this scenario we are matching cluster directly by name, using `clusterName` reference, but a `clusterGroup` or a label based selection can be used instead:
+In this scenario we are matching cluster directly by name, using `clusterName` reference, but a `clusterGroup` or a label based selection can be used instead or together with `clusterName`:
```yaml
targets:
- clusterName: docker-demo
@@ -22,33 +23,7 @@ In this scenario we are matching cluster directly by name, using `clusterName` r
We are deploying `HelmApp` resource in the `default` namespace. The namespace should be the same for the CAPI Cluster for fleet to locate it.
```yaml
-apiVersion: fleet.cattle.io/v1alpha1
-kind: HelmApp
-metadata:
- name: calico
-spec:
- helm:
- releaseName: projectcalico
- repo: https://docs.tigera.io/calico/charts
- chart: tigera-operator
- templateValues:
- installation: |-
- cni:
- type: Calico
- ipam:
- type: HostLocal
- calicoNetwork:
- bgp: Disabled
- mtu: 1350
- ipPools:
- ${- range $cidr := .ClusterValues.Cluster.spec.clusterNetwork.pods.cidrBlocks }
- - cidr: "${ $cidr }"
- encapsulation: None
- natOutgoing: Enabled
- nodeSelector: all()${- end}
- insecureSkipTLSVerify: true
- targets:
- - clusterName: docker-demo
+{{#include ../../../testdata/helm.yaml}}
```
`HelmApp` supports fleet [templating][] options, otherwise available exclusively to the `fleet.yaml` configuration, stored in the [git repository contents][], and applied via the `GitRepo` resource.
diff --git a/docs/src/03_tutorials/04_installing_calico_via_gitrepo.md b/docs/src/03_tutorials/04_installing_calico_via_gitrepo.md
new file mode 100644
index 0000000..7357052
--- /dev/null
+++ b/docs/src/03_tutorials/04_installing_calico_via_gitrepo.md
@@ -0,0 +1,98 @@
+# Installing Calico CNI using GitRepo
+
+
+
+Note: For this setup to work, you need have Fleet and Fleet CRDs charts installed
+with version >= `v0.12.0-alpha.14`.
+
+
+
+In this tutorial we will deploy `Calico` CNI using `GitRepo` resource on `RKE2` based docker cluster.
+
+## Deploying RKE2 docker cluster
+
+We will first need to create a RKE2 based docker cluster from templates:
+
+```bash
+> kubectl apply -f testdata/cluster_docker_rke2.yaml
+dockercluster.infrastructure.cluster.x-k8s.io/docker-demo created
+cluster.cluster.x-k8s.io/docker-demo created
+dockermachinetemplate.infrastructure.cluster.x-k8s.io/docker-demo-control-plane created
+rke2controlplane.controlplane.cluster.x-k8s.io/docker-demo-control-plane created
+dockermachinetemplate.infrastructure.cluster.x-k8s.io/docker-demo-md-0 created
+rke2configtemplate.bootstrap.cluster.x-k8s.io/docker-demo-md-0 created
+machinedeployment.cluster.x-k8s.io/docker-demo-md-0 created
+configmap/docker-demo-lb-config created
+```
+
+In this scenario cluster is located in the `default` namespace, where the rest of fleet objects will go.
+Cluster is labeled with `cni: calico` in order for the `GitRepo` to match on it.
+
+```yaml
+apiVersion: cluster.x-k8s.io/v1beta1
+kind: Cluster
+metadata:
+ name: docker-demo
+ labels:
+ cni: calico
+```
+
+Now that cluster is created, `GitRepo` can be applied which will be evaluated asynchroniously.
+
+## Deploying Calico CNI via `GitRepo`
+
+We will first review the content of our `fleet.yaml` file:
+
+```yaml
+{{#include ../../../fleet/applications/calico/fleet.yaml}}
+```
+
+In this scenario we are using `helm` definition which is consistent with the `HelmApp` spec from the [previous][] guide, and defines same templating rules.
+
+We also need to [resolve conflicts][], which happen due to in-place modification of some resources by the `calico` controllers. For that, the `diff` section is used, where we remove blocking fields from comparison.
+
+[previous]: ./03_installing_calico.md
+[resolve conflicts]: https://fleet.rancher.io/bundle-diffs
+
+Then we are specifying `targets.yaml` file, which will declare selection rules for this `fleet.yaml` configuration. In our case, we will match on clusters labeled with `cni: calico` label:
+
+```yaml
+{{#include ../../../fleet/applications/calico/targets.yaml}}
+```
+
+Once everything is ready, we need to apply our `GitRepo` in the `default` namespace:
+
+```yaml
+{{#include ../../../testdata/gitrepo-calico.yaml}}
+```
+
+```bash
+> kubectl apply -f testdata/gitrepo-calico.yaml
+gitrepo.fleet.cattle.io/calico created
+# After some time
+> kubectl get gitrepo
+NAME REPO COMMIT BUNDLEDEPLOYMENTS-READY STATUS
+calico https://github.com/rancher-sandbox/cluster-api-addon-provider-fleet.git 62b4fe6944687e02afb331b9e1839e33c539f0c7 1/1
+```
+
+Now our cluster have `calico` installed, and all nodes are marked as `Ready`:
+
+```bash
+# exec into one of the CP node containers
+> docker exec -it fef3427009f6 /bin/bash
+root@docker-demo-control-plane-krtnt:/#
+root@docker-demo-control-plane-krtnt:/# kubectl get pods -n calico-system --kubeconfig /var/lib/rancher/rke2/server/cred/api-server.kubeconfig
+NAME READY STATUS RESTARTS AGE
+calico-kube-controllers-55cbcc7467-j5bbd 1/1 Running 0 3m30s
+calico-node-mbrqg 1/1 Running 0 3m30s
+calico-node-wlbwn 1/1 Running 0 3m30s
+calico-typha-f48c7ddf7-kbq6d 1/1 Running 0 3m30s
+csi-node-driver-87tlx 2/2 Running 0 3m30s
+csi-node-driver-99pqw 2/2 Running 0 3m30s
+```
+
+## Demo
+
+You can follow along with the demo to verify that your deployment is matching expected result:
+
+
\ No newline at end of file
diff --git a/docs/src/04_reference/01_import-strategy.md b/docs/src/04_reference/01_import-strategy.md
index 7a6456c..1b5de75 100644
--- a/docs/src/04_reference/01_import-strategy.md
+++ b/docs/src/04_reference/01_import-strategy.md
@@ -19,8 +19,6 @@ Fleet mainly relies on `Cluster` labels, `Cluster` names and `ClusterGroups` whe
`FleetAddonConfig` provides several configuration options to define clusters to import.
-**Note: Please be aware that chaning selection configuration requires restart of the `CAAPF` instance, as these selection options directly translate into watch configurations for controllers established on the `API` server.**
-
### Namespace Label Selection
This section defines how to select namespaces based on specific labels. The `namespaceSelector` field ensures that the import strategy applies only to namespaces that have the label `import: "true"`. This is useful for scoping automatic import to specific namespaces rather than applying it cluster-wide.
diff --git a/fleet/applications/calico/fleet.yaml b/fleet/applications/calico/fleet.yaml
new file mode 100644
index 0000000..9b57067
--- /dev/null
+++ b/fleet/applications/calico/fleet.yaml
@@ -0,0 +1,27 @@
+helm:
+ releaseName: projectcalico
+ repo: https://docs.tigera.io/calico/charts
+ chart: tigera-operator
+ templateValues:
+ installation: |-
+ cni:
+ type: Calico
+ ipam:
+ type: HostLocal
+ calicoNetwork:
+ bgp: Disabled
+ mtu: 1350
+ ipPools:
+ ${- range $cidr := .ClusterValues.Cluster.spec.clusterNetwork.pods.cidrBlocks }
+ - cidr: "${ $cidr }"
+ encapsulation: None
+ natOutgoing: Enabled
+ nodeSelector: all()${- end}
+
+diff:
+ comparePatches:
+ - apiVersion: operator.tigera.io/v1
+ kind: Installation
+ name: default
+ operations:
+ - {"op":"remove", "path":"/spec/kubernetesProvider"}
\ No newline at end of file
diff --git a/fleet/applications/calico/targets.yaml b/fleet/applications/calico/targets.yaml
new file mode 100644
index 0000000..60e339d
--- /dev/null
+++ b/fleet/applications/calico/targets.yaml
@@ -0,0 +1,4 @@
+targets:
+- clusterSelector:
+ matchLabels:
+ cni: calico
diff --git a/justfile b/justfile
index 6a07757..0018916 100644
--- a/justfile
+++ b/justfile
@@ -57,7 +57,6 @@ compile features="": _create-out-dir
[private]
_build features="":
- just compile {{features}}
docker buildx build -t {{ORG}}/{{NAME}}:{{TAG}} .
# docker build base
@@ -77,6 +76,10 @@ docker-build:
docker-push:
docker push {{ORG}}/{{NAME}}:{{TAG}}
+build-and-load:
+ docker build . -t {{ORG}}/{{NAME}}:{{TAG}}
+ kind load docker-image {{ORG}}/{{NAME}}:{{TAG}} --name dev
+
load-base features="":
just _build {{features}}
kind load docker-image {{ORG}}/{{NAME}}:{{TAG}} --name dev
@@ -100,14 +103,30 @@ deploy-kindnet:
deploy-calico:
kubectl --context kind-dev apply -f testdata/helm.yaml
+deploy-calico-gitrepo: _download-yq
+ #!/usr/bin/env bash
+ set -euxo pipefail
+ repo=`git remote get-url origin`
+ branch=`git branch --show-current`
+ cp testdata/gitrepo-calico.yaml {{OUT_DIR}}/gitrepo-calico.yaml
+ yq -i ".spec.repo = \"${repo}\"" {{OUT_DIR}}/gitrepo-calico.yaml
+ yq -i ".spec.branch = \"${branch}\"" {{OUT_DIR}}/gitrepo-calico.yaml
+ kubectl apply -f {{OUT_DIR}}/gitrepo-calico.yaml
+
# Deploy an example app bundle to the cluster
deploy-app:
kubectl --context kind-dev apply -f testdata/bundle.yaml
# Deploy child cluster using docker & kubeadm
deploy-child-cluster:
+ kind delete cluster --name docker-demo || true
kubectl --context kind-dev apply -f testdata/cluster_docker_kcp.yaml
+# Deploy child cluster using docker & rke2
+deploy-child-rke2-cluster:
+ kind delete cluster --name docker-demo || true
+ kubectl --context kind-dev apply -f testdata/cluster_docker_rke2.yaml
+
# Deploy child cluster-call based cluster using docker & kubeadm
deploy-child-cluster-class:
kind delete cluster --name capi-quickstart || true
@@ -115,11 +134,7 @@ deploy-child-cluster-class:
# Add and update helm repos used
update-helm-repos:
- #helm repo add gitea-charts https://dl.gitea.com/charts/
helm repo add fleet https://rancher.github.io/fleet-helm-charts/
- #helm repo add jetstack https://charts.jetstack.io
- #helm repo add traefik https://traefik.github.io/charts
- #helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
# Install fleet into the k8s cluster
@@ -136,7 +151,7 @@ install-capi: _download-clusterctl
# Deploy will deploy the operator
deploy features="": _download-kustomize
just generate {{features}}
- just load-base {{features}}
+ just build-and-load
kustomize build config/default | kubectl apply -f -
kubectl --context kind-dev apply -f testdata/config.yaml
kubectl wait fleetaddonconfigs fleet-addon-config --for=jsonpath='{.status.installedVersion}' --timeout=150s
@@ -153,6 +168,12 @@ test-import: start-dev deploy deploy-child-cluster deploy-kindnet deploy-app &&
kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo
kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo
+# Full e2e test of importing cluster in fleet
+test-import-rke2: start-dev deploy deploy-child-rke2-cluster deploy-calico-gitrepo deploy-app
+ kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces
+ kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo
+ kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo
+
collect-test-import:
-just collect-artifacts dev
-just collect-artifacts docker-demo
diff --git a/src/api/capi_cluster.rs b/src/api/capi_cluster.rs
index 59d47b6..22903c4 100644
--- a/src/api/capi_cluster.rs
+++ b/src/api/capi_cluster.rs
@@ -1,5 +1,8 @@
use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus};
-use kube::{api::{ObjectMeta, TypeMeta}, Resource};
+use kube::{
+ api::{ObjectMeta, TypeMeta},
+ Resource,
+};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
diff --git a/src/api/capi_clusterclass.rs b/src/api/capi_clusterclass.rs
index 7482c1a..e7af08e 100644
--- a/src/api/capi_clusterclass.rs
+++ b/src/api/capi_clusterclass.rs
@@ -1,5 +1,8 @@
use cluster_api_rs::capi_clusterclass::{ClusterClassSpec, ClusterClassStatus};
-use kube::{api::{ObjectMeta, TypeMeta}, Resource};
+use kube::{
+ api::{ObjectMeta, TypeMeta},
+ Resource,
+};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
diff --git a/src/api/fleet_addon_config.rs b/src/api/fleet_addon_config.rs
index f731331..e91b504 100644
--- a/src/api/fleet_addon_config.rs
+++ b/src/api/fleet_addon_config.rs
@@ -1,4 +1,4 @@
-use fleet_api_rs::fleet_cluster::ClusterAgentEnvVars;
+use fleet_api_rs::fleet_cluster::{ClusterAgentEnvVars, ClusterAgentTolerations};
use k8s_openapi::{
api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::LabelSelector,
};
@@ -107,6 +107,10 @@ pub struct ClusterConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_namespace: Option,
+ /// Agent taint toleration settings for every cluster
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub agent_tolerations: Option>,
+
/// Host network allows to deploy agent configuration using hostNetwork: true setting
/// which eludes dependency on the CNI configuration for the cluster.
#[serde(skip_serializing_if = "Option::is_none")]
@@ -134,6 +138,32 @@ impl ClusterConfig {
.unwrap_or(AGENT_NAMESPACE.to_string())
}
+ pub(crate) fn agent_tolerations(&self) -> Vec {
+ let agent_tolerations = vec![
+ ClusterAgentTolerations {
+ effect: Some("NoSchedule".into()),
+ operator: Some("Exists".into()),
+ key: Some("node.kubernetes.io/not-ready".into()),
+ ..Default::default()
+ },
+ ClusterAgentTolerations {
+ effect: Some("NoSchedule".into()),
+ operator: Some("Exists".into()),
+ key: Some("node.cluster.x-k8s.io/uninitialized".into()),
+ ..Default::default()
+ },
+ ClusterAgentTolerations {
+ effect: Some("NoSchedule".into()),
+ operator: Some("Equal".into()),
+ key: Some("node.cloudprovider.kubernetes.io/uninitialized".into()),
+ value: Some("true".into()),
+ ..Default::default()
+ },
+ ];
+
+ self.agent_tolerations.clone().unwrap_or(agent_tolerations)
+ }
+
#[cfg(feature = "agent-initiated")]
pub(crate) fn agent_initiated_connection(&self) -> bool {
self.agent_initiated.filter(|&set| set).is_some()
@@ -167,6 +197,7 @@ impl Default for ClusterConfig {
selectors: Default::default(),
patch_resource: Some(true),
agent_env_vars: None,
+ agent_tolerations: None,
}
}
}
@@ -233,11 +264,9 @@ impl NamingStrategy {
#[serde(rename_all = "camelCase")]
pub struct Selectors {
/// Namespace label selector. If set, only clusters in the namespace matching label selector will be imported.
- /// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
pub namespace_selector: LabelSelector,
/// Cluster label selector. If set, only clusters matching label selector will be imported.
- /// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
pub selector: LabelSelector,
}
diff --git a/src/api/fleet_cluster.rs b/src/api/fleet_cluster.rs
index 28fdbe8..fc8e3f6 100644
--- a/src/api/fleet_cluster.rs
+++ b/src/api/fleet_cluster.rs
@@ -1,6 +1,9 @@
-use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_cluster::{ClusterSpec, ClusterStatus};
-use kube::{api::{ObjectMeta, TypeMeta}, Resource};
+use kube::{
+ api::{ObjectMeta, TypeMeta},
+ Resource,
+};
+use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_cluster::Cluster)]
diff --git a/src/api/fleet_cluster_registration_token.rs b/src/api/fleet_cluster_registration_token.rs
index 64ae322..08999eb 100644
--- a/src/api/fleet_cluster_registration_token.rs
+++ b/src/api/fleet_cluster_registration_token.rs
@@ -1,8 +1,11 @@
-use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_cluster_registration_token::{
ClusterRegistrationTokenSpec, ClusterRegistrationTokenStatus,
};
-use kube::{api::{ObjectMeta, TypeMeta}, Resource};
+use kube::{
+ api::{ObjectMeta, TypeMeta},
+ Resource,
+};
+use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_cluster_registration_token::ClusterRegistrationToken)]
diff --git a/src/api/fleet_clustergroup.rs b/src/api/fleet_clustergroup.rs
index 981aac7..b93466f 100644
--- a/src/api/fleet_clustergroup.rs
+++ b/src/api/fleet_clustergroup.rs
@@ -1,6 +1,9 @@
-use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSpec, ClusterGroupStatus};
-use kube::{api::{ObjectMeta, TypeMeta}, Resource};
+use kube::{
+ api::{ObjectMeta, TypeMeta},
+ Resource,
+};
+use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_clustergroup::ClusterGroup)]
diff --git a/src/controller.rs b/src/controller.rs
index 00a79d1..128ec5f 100644
--- a/src/controller.rs
+++ b/src/controller.rs
@@ -6,13 +6,16 @@ use crate::api::fleet_clustergroup::ClusterGroup;
use crate::controllers::addon_config::FleetConfig;
use crate::controllers::controller::{fetch_config, Context, FleetController};
use crate::metrics::Diagnostics;
+use crate::multi_dispatcher::{broadcaster, BroadcastStream, MultiDispatcher};
use crate::{Error, Metrics};
use clap::Parser;
use futures::channel::mpsc;
-use futures::StreamExt;
+use futures::stream::SelectAll;
+use futures::{Stream, StreamExt};
use k8s_openapi::api::core::v1::Namespace;
+use kube::api::{DynamicObject, ListParams};
use kube::core::DeserializeGuard;
use kube::runtime::reflector::ObjectRef;
use kube::runtime::{metadata_watcher, predicates, reflector, watcher, WatchStreamExt};
@@ -26,16 +29,22 @@ use kube::{
},
};
use tokio::sync::Mutex;
+use tokio::time::sleep;
use std::future;
use std::ops::Deref;
+use std::pin::Pin;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
-use tracing::{self, warn};
+use tracing::{self, info, warn};
+
+type DynamicStream = SelectAll<
+ Pin, watcher::Error>> + Send>>,
+>;
/// State shared between the controller and the web server
-#[derive(Clone, Default)]
+#[derive(Clone)]
pub struct State {
/// Diagnostics populated by the reconciler
diagnostics: Arc>,
@@ -45,6 +54,11 @@ pub struct State {
/// Additional flags for controller
pub flags: Flags,
+
+ // dispatcher
+ dispatcher: MultiDispatcher,
+ // shared stream of dynamic events
+ stream: BroadcastStream,
}
#[derive(Parser, Debug, Clone, Default)]
@@ -55,6 +69,12 @@ pub struct Flags {
}
/// State wrapper around the controller outputs for the web server
+impl Default for State {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl State {
pub fn new() -> Self {
let registry = Default::default();
@@ -62,7 +82,9 @@ impl State {
metrics: Metrics::default().register(®istry).unwrap(),
registry,
flags: Flags::parse(),
- ..Default::default()
+ dispatcher: MultiDispatcher::new(128),
+ diagnostics: Default::default(),
+ stream: BroadcastStream::new(Default::default()),
}
}
@@ -82,6 +104,8 @@ impl State {
client,
metrics: self.metrics.clone(),
diagnostics: self.diagnostics.clone(),
+ dispatcher: self.dispatcher.clone(),
+ stream: self.stream.clone(),
})
}
}
@@ -90,20 +114,53 @@ pub async fn run_fleet_addon_config_controller(state: State) {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
- let api: Api = Api::all(client.clone());
- let fleet_addon_config_controller = Controller::new(api, watcher::Config::default())
- .watches(
- Api::>::all(client.clone()),
- Config::default().fields("metadata.name=fleet-controller"),
- |config| config.0.ok().map(|_| ObjectRef::new("fleet-addon-config")),
- )
- .run(
- FleetAddonConfig::reconcile_config_sync,
- error_policy,
- state.to_context(client.clone()),
- )
+
+ let config_controller = Controller::new(
+ Api::::all(client.clone()),
+ Config::default().any_semantic(),
+ )
+ .watches(
+ Api::>::all(client.clone()),
+ Config::default().fields("metadata.name=fleet-controller"),
+ |config| config.0.ok().map(|_| ObjectRef::new("fleet-addon-config")),
+ )
+ .shutdown_on_signal()
+ .run(
+ FleetAddonConfig::reconcile_config_sync,
+ error_policy,
+ state.to_context(client.clone()),
+ )
+ .for_each(|_| futures::future::ready(()));
+
+ let dynamic_watches_controller = Controller::new(
+ Api::::all(client.clone()),
+ Config::default().any_semantic(),
+ )
+ .shutdown_on_signal()
+ .run(
+ FleetAddonConfig::reconcile_dynamic_watches,
+ error_policy,
+ state.to_context(client.clone()),
+ )
+ .for_each(|_| futures::future::ready(()));
+
+ let watcher = broadcaster(state.dispatcher.clone(), state.stream.clone())
.for_each(|_| futures::future::ready(()));
- tokio::join!(fleet_addon_config_controller);
+
+ // Reconcile initial state of watches
+ Arc::new(
+ fetch_config(client.clone())
+ .await
+ .expect("failed to get FleetAddonConfig resource"),
+ )
+ .update_watches(state.to_context(client.clone()))
+ .await
+ .expect("Initial dynamic watches setup to succeed");
+
+ tokio::select! {
+ _ = watcher => {panic!("This should not happen before controllers exit")},
+ _ = futures::future::join(dynamic_watches_controller, config_controller) => {}
+ };
}
pub async fn run_fleet_helm_controller(state: State) {
@@ -112,6 +169,7 @@ pub async fn run_fleet_helm_controller(state: State) {
.expect("failed to create kube Client");
let api: Api = Api::all(client.clone());
let fleet_addon_config_controller = Controller::new(api, watcher::Config::default())
+ .shutdown_on_signal()
.run(
FleetAddonConfig::reconcile_helm,
error_policy,
@@ -127,29 +185,21 @@ pub async fn run_cluster_controller(state: State) {
.await
.expect("failed to create kube Client");
+ loop {
+ let clusters = Api::::all(client.clone());
+ if let Err(e) = clusters.list(&ListParams::default().limit(1)).await {
+ info!("Fleet Clusters are not queryable; {e:?}. Is the CRD installed?");
+ sleep(Duration::new(5, 0)).await;
+ continue;
+ }
+
+ break;
+ }
+
let config = fetch_config(client.clone())
.await
.expect("failed to get FleetAddonConfig resource");
- let (reader, writer) = reflector::store();
- let clusters = watcher(
- Api::::all(client.clone()),
- Config::default()
- .labels_from(
- &config
- .cluster_watch()
- .expect("valid cluster label selector"),
- )
- .any_semantic(),
- )
- .default_backoff()
- .modify(|c| {
- c.managed_fields_mut().clear();
- })
- .reflect(writer)
- .touched_objects()
- .predicate_filter(predicates::resource_version);
-
let fleet = metadata_watcher(
Api::::all(client.clone()),
Config::default().any_semantic(),
@@ -159,7 +209,8 @@ pub async fn run_cluster_controller(state: State) {
.predicate_filter(predicates::resource_version);
let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
- let clusters = Controller::for_stream(clusters, reader)
+ let (sub, reader) = state.dispatcher.subscribe();
+ let clusters = Controller::for_shared_stream(sub, reader)
.owns_stream(fleet)
.reconcile_all_on(namespace_trigger)
.shutdown_on_signal()
@@ -178,28 +229,8 @@ pub async fn run_cluster_controller(state: State) {
return clusters.await;
}
- let (reader, writer) = reflector::store();
- let namespaces = metadata_watcher(
- Api::::all(client.clone()),
- Config::default()
- .labels_from(
- &config
- .namespace_selector()
- .expect("valid namespace selector"),
- )
- .any_semantic(),
- )
- .default_backoff()
- .modify(|ns| {
- ns.managed_fields_mut().clear();
- ns.annotations_mut().clear();
- ns.labels_mut().clear();
- })
- .reflect(writer)
- .touched_objects()
- .predicate_filter(predicates::resource_version);
-
- let ns_controller = Controller::for_stream(namespaces, reader)
+ let (sub, reader) = state.dispatcher.subscribe::();
+ let ns_controller = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(
Cluster::reconcile_ns,
diff --git a/src/controllers/addon_config.rs b/src/controllers/addon_config.rs
index c7f73dc..bf77999 100644
--- a/src/controllers/addon_config.rs
+++ b/src/controllers/addon_config.rs
@@ -1,19 +1,24 @@
use base64::prelude::*;
+use cluster_api_rs::capi_cluster::Cluster;
+use futures::StreamExt as _;
use std::{fmt::Display, io, str::FromStr, sync::Arc, time::Duration};
-use k8s_openapi::api::core::v1::{ConfigMap, Endpoints};
+use k8s_openapi::api::core::v1::{self, ConfigMap, Endpoints};
use kube::{
- api::{ObjectMeta, Patch, PatchParams, TypeMeta},
+ api::{ApiResource, ObjectMeta, Patch, PatchParams, TypeMeta},
client::scope::Namespace,
core::object::HasSpec,
- runtime::controller::Action,
+ runtime::{
+ controller::Action,
+ watcher::{self, Config},
+ },
Api, Resource, ResourceExt,
};
use serde::{ser, Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
-use tracing::instrument;
+use tracing::{info, instrument};
use crate::{
api::fleet_addon_config::{FleetAddonConfig, Install, InstallOptions, Server},
@@ -132,6 +137,52 @@ impl FleetAddonConfig {
)
.await?;
+ info!("Updated fleet config map");
+
+ Ok(Action::await_change())
+ }
+
+ #[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
+ pub async fn update_watches(self: Arc, ctx: Arc) -> DynamiWatcherResult {
+ info!("Reconciling dynamic watches");
+ let cluster_selector = self.cluster_watch()?;
+ let ns_selector = self.namespace_selector()?;
+
+ let mut stream = ctx.stream.stream.lock().await;
+ stream.clear();
+
+ stream.push(
+ watcher::watcher(
+ Api::all_with(ctx.client.clone(), &ApiResource::erase::(&())),
+ Config::default()
+ .labels_from(&cluster_selector)
+ .any_semantic(),
+ )
+ .boxed(),
+ );
+
+ stream.push(
+ watcher::watcher(
+ Api::all_with(
+ ctx.client.clone(),
+ &ApiResource::erase::(&()),
+ ),
+ Config::default().labels_from(&ns_selector).any_semantic(),
+ )
+ .boxed(),
+ );
+
+ info!("Reconciled dynamic watches to match selectors: namespace={ns_selector}, cluster={cluster_selector}");
+ Ok(Action::await_change())
+ }
+
+ #[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
+ pub async fn reconcile_dynamic_watches(
+ self: Arc,
+ ctx: Arc,
+ ) -> crate::Result {
+ self.update_watches(ctx).await?;
+
Ok(Action::await_change())
}
@@ -314,6 +365,14 @@ pub enum AddonConfigSyncError {
CommandError(#[from] io::Error),
}
+pub type DynamiWatcherResult = std::result::Result;
+
+#[derive(Error, Debug)]
+pub enum DynamicWatcherError {
+ #[error("Invalid selector encountered: {0}")]
+ SelectorParseError(#[from] kube::core::ParseExpressionError),
+}
+
mod tests {
#[test]
fn test() {
diff --git a/src/controllers/cluster.rs b/src/controllers/cluster.rs
index 5ac3163..66ca558 100644
--- a/src/controllers/cluster.rs
+++ b/src/controllers/cluster.rs
@@ -8,7 +8,7 @@ use crate::api::fleet_cluster_registration_token::ClusterRegistrationToken;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::Error;
use cluster_api_rs::capi_cluster::ClusterTopology;
-use fleet_api_rs::fleet_cluster::{ClusterAgentTolerations, ClusterSpec};
+use fleet_api_rs::fleet_cluster::ClusterSpec;
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec};
use futures::channel::mpsc::Sender;
use k8s_openapi::api::core::v1::Namespace;
@@ -184,22 +184,6 @@ impl Cluster {
None | Some(ClusterTopology { .. }) => self.labels().clone(),
};
- let agent_tolerations = Some(vec![
- ClusterAgentTolerations {
- effect: Some("NoSchedule".into()),
- operator: Some("Equal".into()),
- key: Some("node.kubernetes.io/not-ready".into()),
- ..Default::default()
- },
- ClusterAgentTolerations {
- effect: Some("NoSchedule".into()),
- operator: Some("Equal".into()),
- key: Some("node.cloudprovider.kubernetes.io/uninitialized".into()),
- value: Some("true".into()),
- ..Default::default()
- },
- ]);
-
fleet_cluster::Cluster {
types: Some(TypeMeta::resource::()),
metadata: ObjectMeta {
@@ -216,18 +200,18 @@ impl Cluster {
true => ClusterSpec {
client_id: Some(Alphanumeric.sample_string(&mut rand::rng(), 64)),
agent_namespace: config.agent_install_namespace().into(),
+ agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
- agent_tolerations,
..Default::default()
}
.into(),
false => ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
+ agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
- agent_tolerations,
..Default::default()
}
.into(),
@@ -236,9 +220,9 @@ impl Cluster {
spec: ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
+ agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
- agent_tolerations,
..Default::default()
},
..Default::default()
diff --git a/src/controllers/controller.rs b/src/controllers/controller.rs
index f64d168..61860e6 100644
--- a/src/controllers/controller.rs
+++ b/src/controllers/controller.rs
@@ -1,21 +1,25 @@
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::controllers::PatchError;
use crate::metrics::Diagnostics;
+use crate::multi_dispatcher::{BroadcastStream, MultiDispatcher};
use crate::{telemetry, Error, Metrics};
use chrono::Utc;
+use futures::stream::SelectAll;
+use futures::Stream;
use k8s_openapi::NamespaceResourceScope;
-use kube::api::{Patch, PatchParams, PostParams};
+use kube::api::{DynamicObject, Patch, PatchParams, PostParams};
use kube::runtime::events::{Event, EventType};
-use kube::runtime::finalizer;
+use kube::runtime::{finalizer, watcher};
use kube::{api::Api, client::Client, runtime::controller::Action};
use serde::de::DeserializeOwned;
use serde::Serialize;
+use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{self, debug, info, instrument};
@@ -26,6 +30,10 @@ use super::{
pub static FLEET_FINALIZER: &str = "fleet.addons.cluster.x-k8s.io";
+type DynamicStream = SelectAll<
+ Pin, watcher::Error>> + Send>>,
+>;
+
// Context for the reconciler
#[derive(Clone)]
pub struct Context {
@@ -35,6 +43,10 @@ pub struct Context {
pub diagnostics: Arc>,
/// Prom metrics
pub metrics: Metrics,
+ // Dispatcher for dynamic resource controllers
+ pub dispatcher: MultiDispatcher,
+ // shared stream of dynamic events
+ pub stream: BroadcastStream,
}
pub(crate) async fn get_or_create(ctx: Arc, res: R) -> GetOrCreateResult
diff --git a/src/lib.rs b/src/lib.rs
index fb25207..bde3559 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,7 @@
-use controllers::{addon_config::AddonConfigSyncError, BundleError, SyncError};
+use controllers::{
+ addon_config::{AddonConfigSyncError, DynamicWatcherError},
+ BundleError, SyncError,
+};
use futures::channel::mpsc::TrySendError;
use thiserror::Error;
@@ -19,6 +22,9 @@ pub enum Error {
#[error("Fleet config error: {0}")]
FleetConfigError(#[from] AddonConfigSyncError),
+ #[error("Dynamic watcher error: {0}")]
+ DynamicWatcherError(#[from] DynamicWatcherError),
+
#[error("Namespace trigger error: {0}")]
TriggerError(#[from] TrySendError<()>),
@@ -44,6 +50,7 @@ pub mod controller;
pub use crate::controller::*;
pub mod api;
pub mod controllers;
+mod multi_dispatcher;
pub mod predicates;
/// Log and trace integrations
diff --git a/src/multi_dispatcher.rs b/src/multi_dispatcher.rs
new file mode 100644
index 0000000..3cb1175
--- /dev/null
+++ b/src/multi_dispatcher.rs
@@ -0,0 +1,236 @@
+use std::{
+ hash::Hash,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+use async_broadcast::{InactiveReceiver, Receiver, Sender};
+use async_stream::stream;
+use futures::{lock::Mutex, ready, Stream, StreamExt as _};
+use kube::{
+ api::{DynamicObject, GroupVersionKind},
+ runtime::{
+ reflector::{store::Writer, Lookup, Store},
+ watcher::{Event, Result},
+ },
+ Resource,
+};
+use pin_project::pin_project;
+use serde::de::DeserializeOwned;
+
+#[derive(Clone)]
+pub struct MultiDispatcher {
+ dispatch_tx: Sender>,
+ // An inactive reader that prevents the channel from closing until the
+ // writer is dropped.
+ _dispatch_rx: InactiveReceiver>,
+}
+
+impl MultiDispatcher {
+ #[must_use]
+ pub fn new(buf_size: usize) -> Self {
+ // Create a broadcast (tx, rx) pair
+ let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
+ // The tx half will not wait for any receivers to be active before
+ // broadcasting events. If no receivers are active, events will be
+ // buffered.
+ dispatch_tx.set_await_active(false);
+ Self {
+ dispatch_tx,
+ _dispatch_rx: dispatch_rx.deactivate(),
+ }
+ }
+
+ /// Return a handle to a typed subscriber
+ ///
+ /// Multiple subscribe handles may be obtained, by either calling
+ /// `subscribe` multiple times, or by calling `clone()`
+ ///
+ /// This function returns a `Some` when the [`Writer`] is constructed through
+ /// [`Writer::new_shared`] or [`store_shared`], and a `None` otherwise.
+ #[must_use]
+ pub fn subscribe(&self) -> (TypedReflectHandle, Store)
+ where
+ K: Resource + Clone + DeserializeOwned,
+ K::DynamicType: Eq + Clone + Hash + Default,
+ {
+ let sub = TypedReflectHandle::new(self.dispatch_tx.new_receiver());
+ let reader = sub.reader();
+ (sub, reader)
+ }
+
+ /// Broadcast an event to any downstream listeners subscribed on the store
+ pub(crate) async fn broadcast_event(&mut self, event: &Event) {
+ match event {
+ // Broadcast stores are pre-initialized
+ Event::InitDone => {}
+ ev => {
+ let _ = self.dispatch_tx.broadcast_direct(ev.clone()).await;
+ }
+ }
+ }
+}
+
+/// `BroadcastStream` allows to stream shared list of dynamic objects,
+/// sources of which can be changed at any moment.
+pub struct BroadcastStream {
+ pub stream: Arc>,
+}
+
+impl Clone for BroadcastStream {
+ fn clone(&self) -> Self {
+ Self {
+ stream: self.stream.clone(),
+ }
+ }
+}
+
+impl BroadcastStream
+where
+ W: Stream>> + Unpin,
+{
+ pub fn new(stream: Arc>) -> Self {
+ Self { stream }
+ }
+}
+
+impl Stream for BroadcastStream
+where
+ W: Stream>> + Unpin,
+{
+ type Item = W::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll