diff --git a/Cargo.lock b/Cargo.lock index 7d8868e..a8cca1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -561,6 +561,7 @@ dependencies = [ "chrono", "clap", "cluster-api-rs", + "educe", "fleet-api-rs", "futures", "http 1.3.1", diff --git a/Cargo.toml b/Cargo.toml index 83dcf91..ec327c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ fleet-api-rs = "0.12.3" async-broadcast = "0.7.2" pin-project = "1.1.10" async-stream = "0.3.6" +educe = { version = "0.6.0", features = ["PartialEq"] } [dev-dependencies] assert-json-diff = "2.0.2" diff --git a/justfile b/justfile index e695e96..4011435 100644 --- a/justfile +++ b/justfile @@ -45,7 +45,7 @@ test-unit: cargo test # run clippy -clippy: +clippy: fmt cargo clippy --all-targets --all-features --fix --allow-dirty -- -W clippy::pedantic # compile for musl (for docker image) @@ -94,7 +94,7 @@ start-dev: _cleanup-out-dir _create-out-dir _download-kubectl kind delete cluster --name dev || true kind create cluster --image=kindest/node:v{{KUBE_VERSION}} --config testdata/kind-config.yaml just install-capi - kubectl wait pods --for=condition=Ready --timeout=300s --all --all-namespaces + kubectl wait pods --for=condition=Ready --timeout=500s --all --all-namespaces # Stop the local dev environment stop-dev: @@ -171,13 +171,13 @@ release-manifests: _create-out-dir _download-kustomize test-import: start-dev deploy deploy-child-cluster deploy-kindnet deploy-app && collect-test-import kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo - kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo + kubectl wait clusters.fleet.cattle.io --timeout=500s --for=condition=Ready=true docker-demo # Full e2e test of importing cluster in fleet test-import-rke2: start-dev deploy deploy-child-rke2-cluster deploy-calico-gitrepo deploy-app kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo - kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo + kubectl wait clusters.fleet.cattle.io --timeout=500s --for=condition=Ready=true docker-demo collect-test-import: -just collect-artifacts dev @@ -206,11 +206,15 @@ collect-artifacts cluster: # Full e2e test of importing cluster and ClusterClass in fleet [private] _test-import-all: - kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=300s --for=create --for=condition=Ready=true quick-start + kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=500s --for=create quick-start + kubectl wait clustergroups.fleet.cattle.io -n clusterclass --timeout=500s --for=condition=Ready=true quick-start # Verify that cluster group created for cluster referencing clusterclass in a different namespace - kubectl wait bundlenamespacemappings.fleet.cattle.io --timeout=300s --for=create -n clusterclass default - kubectl wait clustergroups.fleet.cattle.io --timeout=300s --for=create --for=jsonpath='{.status.clusterCount}=1' --for=condition=Ready=true quick-start.clusterclass - kubectl wait clusters.fleet.cattle.io --timeout=300s --for=create --for=condition=Ready=true capi-quickstart + kubectl wait bundlenamespacemappings.fleet.cattle.io --timeout=500s --for=create -n clusterclass default + kubectl wait clustergroups.fleet.cattle.io --timeout=500s --for=create quick-start.clusterclass + kubectl wait clustergroups.fleet.cattle.io --timeout=500s --for=jsonpath='{.status.clusterCount}=1' quick-start.clusterclass + kubectl wait clustergroups.fleet.cattle.io --timeout=500s --for=condition=Ready=true quick-start.clusterclass + kubectl wait clusters.fleet.cattle.io --timeout=500s --for=create capi-quickstart + kubectl wait clusters.fleet.cattle.io --timeout=500s --for=condition=Ready=true capi-quickstart [private] _test-delete-all: diff --git a/src/api/bundle_namespace_mapping.rs b/src/api/bundle_namespace_mapping.rs index 000b547..c0e2ac3 100644 --- a/src/api/bundle_namespace_mapping.rs +++ b/src/api/bundle_namespace_mapping.rs @@ -2,11 +2,13 @@ use fleet_api_rs::fleet_bundle_namespace_mapping::{ BundleNamespaceMappingBundleSelector, BundleNamespaceMappingNamespaceSelector, }; use kube::{ - api::{ObjectMeta, TypeMeta}, Resource, + api::{ObjectMeta, TypeMeta}, }; use serde::{Deserialize, Serialize}; +use crate::api::comparable::ResourceDiff; + mod mapping { use kube::CustomResource; use schemars::JsonSchema; @@ -32,3 +34,9 @@ pub struct BundleNamespaceMapping { pub bundle_selector: BundleNamespaceMappingBundleSelector, pub namespace_selector: BundleNamespaceMappingNamespaceSelector, } + +impl ResourceDiff for BundleNamespaceMapping { + fn diff(&self, other: &Self) -> bool { + self != other + } +} diff --git a/src/api/capi_cluster.rs b/src/api/capi_cluster.rs index 0a1fe0c..fad91a2 100644 --- a/src/api/capi_cluster.rs +++ b/src/api/capi_cluster.rs @@ -8,8 +8,8 @@ use fleet_api_rs::{ fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec}, }; use kube::{ - api::{ObjectMeta, TypeMeta}, CustomResource, Resource, ResourceExt as _, + api::{ObjectMeta, TypeMeta}, }; #[cfg(feature = "agent-initiated")] use rand::distr::{Alphanumeric, SampleString as _}; @@ -20,7 +20,7 @@ use super::{ bundle_namespace_mapping::BundleNamespaceMapping, fleet_addon_config::ClusterConfig, fleet_cluster, - fleet_clustergroup::{ClusterGroup, CLUSTER_CLASS_LABEL, CLUSTER_CLASS_NAMESPACE_LABEL}, + fleet_clustergroup::{CLUSTER_CLASS_LABEL, CLUSTER_CLASS_NAMESPACE_LABEL, ClusterGroup}, }; #[cfg(feature = "agent-initiated")] diff --git a/src/api/comparable.rs b/src/api/comparable.rs new file mode 100644 index 0000000..d471946 --- /dev/null +++ b/src/api/comparable.rs @@ -0,0 +1,6 @@ +// Trait for resources that can be compared +pub(crate) trait ResourceDiff: kube::ResourceExt { + fn diff(&self, other: &Self) -> bool { + self.meta() != other.meta() + } +} diff --git a/src/api/fleet_addon_config.rs b/src/api/fleet_addon_config.rs index 743741e..13582de 100644 --- a/src/api/fleet_addon_config.rs +++ b/src/api/fleet_addon_config.rs @@ -1,18 +1,20 @@ use std::{fmt::Display, str::FromStr}; +use crate::api::comparable::ResourceDiff; +use educe::Educe; use fleet_api_rs::fleet_cluster::{ClusterAgentEnvVars, ClusterAgentTolerations}; use k8s_openapi::{ api::core::v1::{ConfigMap, ObjectReference}, apimachinery::pkg::apis::meta::v1::{Condition, LabelSelector}, }; use kube::{ + CustomResource, KubeSchema, Resource, api::{ObjectMeta, TypeMeta}, core::{ParseExpressionError, Selector}, - CustomResource, KubeSchema, Resource, }; use schemars::JsonSchema; -use serde::{ser, Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; +use serde::{Deserialize, Serialize, ser}; +use serde_with::{DisplayFromStr, serde_as}; use serde_yaml::Value; pub const AGENT_NAMESPACE: &str = "fleet-addon-agent"; @@ -20,7 +22,7 @@ pub const EXPERIMENTAL_OCI_STORAGE: &str = "EXPERIMENTAL_OCI_STORAGE"; pub const EXPERIMENTAL_HELM_OPS: &str = "EXPERIMENTAL_HELM_OPS"; /// This provides a config for fleet addon functionality -#[derive(CustomResource, Deserialize, Serialize, Clone, Default, Debug, KubeSchema)] +#[derive(CustomResource, Deserialize, Serialize, Clone, Default, Debug, KubeSchema, PartialEq)] #[kube( kind = "FleetAddonConfig", group = "addons.cluster.x-k8s.io", @@ -63,6 +65,11 @@ impl Default for FleetAddonConfig { } } } +impl ResourceDiff for FleetAddonConfig { + fn diff(&self, _: &Self) -> bool { + true + } +} #[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -73,7 +80,7 @@ pub struct FleetAddonConfigStatus { pub conditions: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct ClusterClassConfig { /// Setting to disable setting owner references on the created resources @@ -95,7 +102,7 @@ impl Default for ClusterClassConfig { } } -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct ClusterConfig { /// Apply a `ClusterGroup` for a `ClusterClass` referenced from a different namespace. @@ -152,13 +159,21 @@ pub struct FleetSettings { pub data: Option, } +impl ResourceDiff for FleetSettings { + fn diff(&self, other: &Self) -> bool { + self.data != other.data + } +} + #[serde_as] -#[derive(Serialize, Deserialize, Default, Clone, Debug)] +#[derive(Serialize, Deserialize, Default, Clone, Debug, Educe)] +#[educe(PartialEq)] pub struct FleetSettingsSpec { #[serde(default)] #[serde_as(as = "DisplayFromStr")] pub fleet: FleetChartValues, + #[educe(PartialEq(ignore))] #[serde(flatten)] pub other: Value, } @@ -238,7 +253,7 @@ impl ClusterConfig { } /// `NamingStrategy` is controlling Fleet cluster naming -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default, PartialEq)] pub struct NamingStrategy { /// Specify a prefix for the Cluster name, applied to created Fleet cluster pub prefix: Option, @@ -264,7 +279,7 @@ impl Default for ClusterConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct FleetConfig { /// fleet server url configuration options @@ -290,7 +305,7 @@ impl Default for FleetConfig { /// Feature toggles for enabling or disabling experimental functionality. /// This struct controls access to specific experimental features. -#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct FeatureGates { /// Enables experimental OCI storage support. @@ -360,7 +375,7 @@ impl Default for FeatureGates { /// `FeaturesConfigMap` references a `ConfigMap` where to apply feature flags. /// If a `ConfigMap` is referenced, the controller will update it instead of upgrading the Fleet chart. -#[derive(Clone, Default, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct FeaturesConfigMap { // The reference to a ConfigMap resource @@ -369,7 +384,7 @@ pub struct FeaturesConfigMap { } /// `FleetChartValues` represents Fleet chart values. -#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct FleetChartValues { pub extra_env: Option>, @@ -378,14 +393,14 @@ pub struct FleetChartValues { } /// `EnvironmentVariable` is a simple name/value pair. -#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct EnvironmentVariable { pub name: String, pub value: String, } -#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct FleetInstall { /// Chart version to install @@ -421,14 +436,14 @@ impl Default for Install { } } -#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub enum Server { InferLocal(bool), Custom(InstallOptions), } -#[derive(Clone, Default, Debug, Serialize, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct InstallOptions { pub api_server_ca_config_ref: Option, @@ -450,7 +465,7 @@ impl NamingStrategy { } /// Selectors is controlling Fleet import strategy settings. -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Selectors { /// Namespace label selector. If set, only clusters in the namespace matching label selector will be imported. diff --git a/src/api/fleet_cluster.rs b/src/api/fleet_cluster.rs index fc8e3f6..e90e3b5 100644 --- a/src/api/fleet_cluster.rs +++ b/src/api/fleet_cluster.rs @@ -1,10 +1,13 @@ use fleet_api_rs::fleet_cluster::{ClusterSpec, ClusterStatus}; use kube::{ + Resource, ResourceExt, api::{ObjectMeta, TypeMeta}, - Resource, }; use serde::{Deserialize, Serialize}; +use crate::api::comparable::ResourceDiff; +use std::collections::HashSet; + #[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)] #[resource(inherit = fleet_api_rs::fleet_cluster::Cluster)] pub struct Cluster { @@ -14,3 +17,60 @@ pub struct Cluster { pub spec: ClusterSpec, pub status: Option, } + +impl ResourceDiff for Cluster { + fn diff(&self, other: &Self) -> bool { + // Resource was just created + if other.status.is_none() { + return true; + } + + let template_values_equal = self + .spec + .template_values + .as_ref() + .unwrap_or(&std::collections::BTreeMap::new()) + .iter() + .all(|(k, v)| { + other + .spec + .template_values + .as_ref() + .unwrap_or(&std::collections::BTreeMap::new()) + .get(k) + == Some(v) + }); + + let spec_equal = template_values_equal + && self.spec.agent_namespace == other.spec.agent_namespace + && self.spec.host_network == other.spec.host_network + && self.spec.agent_env_vars == other.spec.agent_env_vars + && self.spec.agent_tolerations == other.spec.agent_tolerations; + + if !spec_equal { + return true; + } + + let annotations_equal = self + .annotations() + .iter() + .all(|(k, v)| other.annotations().get(k) == Some(v)); + let labels_equal = self + .labels() + .iter() + .all(|(k, v)| other.labels().get(k) == Some(v)); + + let owner_uids: HashSet = other + .owner_references() + .iter() + .map(|r| &r.uid) + .cloned() + .collect(); + let owner_references_equal = self + .owner_references() + .iter() + .all(|self_ref| owner_uids.contains(&self_ref.uid)); + + !annotations_equal || !labels_equal || !owner_references_equal + } +} diff --git a/src/api/fleet_cluster_registration_token.rs b/src/api/fleet_cluster_registration_token.rs index 08999eb..c73b21e 100644 --- a/src/api/fleet_cluster_registration_token.rs +++ b/src/api/fleet_cluster_registration_token.rs @@ -2,11 +2,13 @@ use fleet_api_rs::fleet_cluster_registration_token::{ ClusterRegistrationTokenSpec, ClusterRegistrationTokenStatus, }; use kube::{ - api::{ObjectMeta, TypeMeta}, Resource, + api::{ObjectMeta, TypeMeta}, }; use serde::{Deserialize, Serialize}; +use crate::api::comparable::ResourceDiff; + #[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)] #[resource(inherit = fleet_api_rs::fleet_cluster_registration_token::ClusterRegistrationToken)] pub struct ClusterRegistrationToken { @@ -16,3 +18,9 @@ pub struct ClusterRegistrationToken { pub spec: ClusterRegistrationTokenSpec, pub status: Option, } + +impl ResourceDiff for ClusterRegistrationToken { + fn diff(&self, other: &Self) -> bool { + self.spec != other.spec + } +} diff --git a/src/api/fleet_clustergroup.rs b/src/api/fleet_clustergroup.rs index d793fe1..93676e2 100644 --- a/src/api/fleet_clustergroup.rs +++ b/src/api/fleet_clustergroup.rs @@ -1,18 +1,19 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use fleet_api_rs::fleet_clustergroup::{ ClusterGroupSelector, ClusterGroupSpec, ClusterGroupStatus, }; use k8s_openapi::api::core::v1::ObjectReference; use kube::{ + Resource, ResourceExt as _, api::{ObjectMeta, TypeMeta}, core::{Expression, Selector}, runtime::reflector::ObjectRef, - Resource, ResourceExt as _, }; use serde::{Deserialize, Serialize}; use super::capi_clusterclass::ClusterClass; +use crate::api::comparable::ResourceDiff; pub static CLUSTER_CLASS_LABEL: &str = "clusterclass-name.fleet.addons.cluster.x-k8s.io"; pub static CLUSTER_CLASS_NAMESPACE_LABEL: &str = @@ -28,6 +29,32 @@ pub struct ClusterGroup { pub status: Option, } +impl ResourceDiff for ClusterGroup { + fn diff(&self, other: &Self) -> bool { + let annotations_equal = self + .annotations() + .iter() + .all(|(k, v)| other.annotations().get(k) == Some(v)); + let labels_equal = self + .labels() + .iter() + .all(|(k, v)| other.labels().get(k) == Some(v)); + + let owner_uids: HashSet = other + .owner_references() + .iter() + .map(|r| &r.uid) + .cloned() + .collect(); + let owner_references_equal = self + .owner_references() + .iter() + .all(|self_ref| owner_uids.contains(&self_ref.uid)); + + self.spec != other.spec || !annotations_equal || !labels_equal || !owner_references_equal + } +} + impl ClusterGroup { pub(crate) fn cluster_class_namespace(&self) -> Option { self.labels() diff --git a/src/api/mod.rs b/src/api/mod.rs index 4a6e7d2..3378bfc 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,6 +1,7 @@ pub mod bundle_namespace_mapping; pub mod capi_cluster; pub mod capi_clusterclass; +pub mod comparable; pub mod fleet_addon_config; pub mod fleet_cluster; #[cfg(feature = "agent-initiated")] diff --git a/src/controller.rs b/src/controller.rs index 4efc7b2..8607a7b 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -5,9 +5,9 @@ use crate::api::fleet_addon_config::FleetAddonConfig; use crate::api::fleet_cluster; use crate::api::fleet_clustergroup::ClusterGroup; use crate::controllers::addon_config::FleetConfig; -use crate::controllers::controller::{fetch_config, Context, DynamicStream, FleetController}; +use crate::controllers::controller::{Context, DynamicStream, FleetController, fetch_config}; use crate::metrics::Diagnostics; -use crate::multi_dispatcher::{broadcaster, BroadcastStream, MultiDispatcher}; +use crate::multi_dispatcher::{BroadcastStream, MultiDispatcher, broadcaster}; use crate::{Error, Metrics}; use chrono::Local; @@ -17,9 +17,10 @@ use futures::{Stream, StreamExt}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time}; use kube::api::{Patch, PatchParams}; use kube::core::DeserializeGuard; -use kube::runtime::reflector::store::Writer; use kube::runtime::reflector::ObjectRef; -use kube::runtime::{metadata_watcher, predicates, reflector, watcher, WatchStreamExt}; +use kube::runtime::reflector::store::Writer; +use kube::runtime::{WatchStreamExt, metadata_watcher, predicates, reflector, watcher}; +use kube::{Resource, ResourceExt}; use kube::{ api::Api, client::Client, @@ -28,7 +29,6 @@ use kube::{ watcher::Config, }, }; -use kube::{Resource, ResourceExt}; use tokio::sync::Barrier; use std::collections::BTreeMap; diff --git a/src/controllers/addon_config.rs b/src/controllers/addon_config.rs index 0400f2d..1a53261 100644 --- a/src/controllers/addon_config.rs +++ b/src/controllers/addon_config.rs @@ -1,5 +1,6 @@ use base64::prelude::*; use chrono::Local; +use educe::Educe; use futures::StreamExt as _; use std::{fmt::Display, io, str::FromStr, sync::Arc, time::Duration}; @@ -8,24 +9,25 @@ use k8s_openapi::{ apimachinery::pkg::apis::meta::v1::{Condition, Time}, }; use kube::{ - api::{ApiResource, DynamicObject, ObjectMeta, Patch, PatchParams, TypeMeta}, + Api, Resource, ResourceExt, + api::{ApiResource, DynamicObject, ObjectMeta, PatchParams, TypeMeta}, client::scope::Namespace, core::object::HasSpec, runtime::{ controller::Action, watcher::{self, Config, Event}, }, - Api, Resource, ResourceExt, }; -use serde::{de::DeserializeOwned, ser, Deserialize, Serialize}; +use serde::{Deserialize, Serialize, de::DeserializeOwned, ser}; use serde_json::Value; -use serde_with::{serde_as, DisplayFromStr}; +use serde_with::{DisplayFromStr, serde_as}; use thiserror::Error; -use tracing::{field::display, info, instrument, Span}; +use tracing::{Span, field::display, info, instrument}; use crate::{ api::{ capi_cluster::Cluster, + comparable::ResourceDiff, fleet_addon_config::{ FeatureGates, FleetAddonConfig, FleetSettings, Install, InstallOptions, Server, }, @@ -34,12 +36,12 @@ use crate::{ }; use super::{ - controller::{patch, Context}, + PatchError, + controller::{Context, patch}, helm::{ self, install::{ChartSearch, FleetChart, HelmOperation}, }, - PatchError, }; #[derive(Resource, Serialize, Deserialize, Default, Clone, Debug)] @@ -51,14 +53,21 @@ pub struct FleetConfig { pub data: FleetConfigSpec, } +impl ResourceDiff for FleetConfig { + fn diff(&self, other: &Self) -> bool { + self.data != other.data + } +} + #[serde_as] -#[derive(Serialize, Deserialize, Default, Clone, Debug)] +#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct FleetConfigSpec { #[serde_as(as = "DisplayFromStr")] pub config: FleetConfigData, } -#[derive(Serialize, Deserialize, Default, Clone, Debug)] +#[derive(Serialize, Deserialize, Default, Clone, Debug, Educe)] +#[educe(PartialEq)] pub struct FleetConfigData { #[serde(rename = "apiServerURL")] pub api_server_url: String, @@ -66,6 +75,7 @@ pub struct FleetConfigData { #[serde(rename = "apiServerCA")] pub api_server_ca: String, + #[educe(PartialEq(ignore))] #[serde(flatten)] pub other: Value, } @@ -155,10 +165,10 @@ impl FleetAddonConfig { } #[instrument(skip_all, fields(reconcile_id, name = self.name_any(), namespace = self.namespace()))] - pub async fn reconcile_config_sync( + async fn sync_fleet_config( self: Arc, ctx: Arc, - ) -> crate::Result { + ) -> ReconcileConfigSyncResult { let _current = Span::current().record("reconcile_id", display(telemetry::get_trace_id())); let ns = Namespace::from("cattle-fleet-system"); let mut fleet_config: FleetConfig = ctx.client.get("fleet-controller", &ns).await?; @@ -173,19 +183,24 @@ impl FleetAddonConfig { fleet_config.meta_mut().managed_fields = None; fleet_config.types = Some(TypeMeta::resource::()); - let api: Api = Api::namespaced(ctx.client.clone(), "cattle-fleet-system"); - api.patch( - &fleet_config.name_any(), + patch( + ctx.clone(), + &mut fleet_config, &PatchParams::apply("addon-provider-fleet").force(), - &Patch::Apply(&fleet_config), ) .await?; - info!("Updated fleet config map"); - Ok(Action::await_change()) } + #[instrument(skip_all, fields(reconcile_id, name = self.name_any(), namespace = self.namespace()))] + pub async fn reconcile_config_sync( + self: Arc, + ctx: Arc, + ) -> crate::Result { + Ok(self.sync_fleet_config(ctx).await?) + } + #[instrument(skip_all, fields(reconcile_id, name = self.name_any(), namespace = self.namespace()))] pub async fn update_watches( self: Arc, @@ -233,7 +248,9 @@ impl FleetAddonConfig { ); } - info!("Reconciled dynamic watches to match selectors: namespace={ns_selector}, cluster={cluster_selector}"); + info!( + "Reconciled dynamic watches to match selectors: namespace={ns_selector}, cluster={cluster_selector}" + ); Ok(Action::await_change()) } @@ -494,6 +511,20 @@ where Ok(ev) } +pub type ReconcileConfigSyncResult = std::result::Result; + +#[derive(Error, Debug)] +pub enum ReconcileConfigSyncError { + #[error("Fleet config map fetch error: {0}")] + FleetConfigFetch(#[from] kube::Error), + + #[error("Addon config sync error: {0}")] + AddonConfigSync(#[from] AddonConfigSyncError), + + #[error("Fleet config map patch error: {0}")] + Patch(#[from] PatchError), +} + pub type FleetPatchResult = std::result::Result; #[derive(Error, Debug)] diff --git a/src/controllers/cluster.rs b/src/controllers/cluster.rs index 6e15853..19a4228 100644 --- a/src/controllers/cluster.rs +++ b/src/controllers/cluster.rs @@ -16,8 +16,8 @@ use kube::api::{ use kube::client::scope; use kube::runtime::watcher::{self, Config}; -use kube::{api::ResourceExt, runtime::controller::Action, Resource}; use kube::{Api, Client}; +use kube::{Resource, api::ResourceExt, runtime::controller::Action}; use serde::Serialize; use serde_json::Value; use tracing::info; @@ -25,7 +25,7 @@ use tracing::info; use std::sync::Arc; use super::controller::{ - fetch_config, get_or_create, patch, Context, FleetBundle, FleetController, + Context, FleetBundle, FleetController, fetch_config, get_or_create, patch, }; use super::{BundleResult, ClusterSyncError, ClusterSyncResult}; @@ -141,7 +141,9 @@ impl FleetBundle for FleetClusterBundle { let class_namespace = mapping.namespace().unwrap_or_default(); let cluster_namespace = mapping.name_any(); - info!("Updated BundleNamespaceMapping for cluster {cluster_name} between class namespace: {class_namespace} and cluster namespace: {cluster_namespace}"); + info!( + "Updated BundleNamespaceMapping for cluster {cluster_name} between class namespace: {class_namespace} and cluster namespace: {cluster_namespace}" + ); } } @@ -152,7 +154,9 @@ impl FleetBundle for FleetClusterBundle { &PatchParams::apply("addon-provider-fleet"), ) .await? - } else { get_or_create(ctx.clone(), cluster).await? }; + } else { + get_or_create(ctx.clone(), cluster).await? + }; #[cfg(feature = "agent-initiated")] if let Some(cluster_registration_token) = self.cluster_registration_token.as_ref() { diff --git a/src/controllers/cluster_class.rs b/src/controllers/cluster_class.rs index 804b538..ae3b58c 100644 --- a/src/controllers/cluster_class.rs +++ b/src/controllers/cluster_class.rs @@ -10,7 +10,7 @@ use kube::runtime::controller::Action; use std::sync::Arc; use super::controller::{ - fetch_config, get_or_create, patch, Context, FleetBundle, FleetController, + Context, FleetBundle, FleetController, fetch_config, get_or_create, patch, }; use super::{BundleResult, GroupSyncResult}; diff --git a/src/controllers/cluster_group.rs b/src/controllers/cluster_group.rs index f4f7239..d68476d 100644 --- a/src/controllers/cluster_group.rs +++ b/src/controllers/cluster_group.rs @@ -9,7 +9,7 @@ use serde_json::json; use std::ops::Deref; use std::sync::Arc; -use super::controller::{patch, Context, FLEET_FINALIZER}; +use super::controller::{Context, FLEET_FINALIZER, patch}; use super::{GroupSyncResult, SyncError}; impl ClusterGroup { diff --git a/src/controllers/controller.rs b/src/controllers/controller.rs index 526b050..4b69081 100644 --- a/src/controllers/controller.rs +++ b/src/controllers/controller.rs @@ -1,12 +1,13 @@ +use crate::api::comparable::ResourceDiff; use crate::api::fleet_addon_config::FleetAddonConfig; use crate::controllers::PatchError; use crate::metrics::Diagnostics; -use crate::multi_dispatcher::{typed_gvk, BroadcastStream, MultiDispatcher}; -use crate::{telemetry, Error, Metrics}; +use crate::multi_dispatcher::{BroadcastStream, MultiDispatcher, typed_gvk}; +use crate::{Error, Metrics, telemetry}; use chrono::Utc; -use futures::stream::SelectAll; use futures::Stream; +use futures::stream::SelectAll; use k8s_openapi::NamespaceResourceScope; use kube::api::{DynamicObject, Patch, PatchParams, PostParams}; @@ -16,15 +17,15 @@ use kube::runtime::{finalizer, watcher}; use kube::{api::Api, client::Client, runtime::controller::Action}; -use serde::de::DeserializeOwned; use serde::Serialize; +use serde::de::DeserializeOwned; use tracing::field::display; use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; use tokio::sync::{Barrier, RwLock}; -use tracing::{self, debug, info, instrument, Span}; +use tracing::{self, Span, debug, info, instrument}; use super::{ BundleResult, ConfigFetchResult, GetOrCreateError, GetOrCreateResult, PatchResult, SyncError, @@ -119,13 +120,24 @@ pub(crate) async fn patch( where R: Clone + Serialize + DeserializeOwned + Debug, R: kube::Resource, - R: kube::ResourceExt, + R: ResourceDiff, { let ns = res.namespace().unwrap_or(String::from("default")); let api: Api = Api::namespaced(ctx.client.clone(), &ns); res.meta_mut().managed_fields = None; + // Perform patch after comparison + if let Some(existing) = api + .get_opt(&res.name_any()) + .await + .map_err(PatchError::Get)? + { + if !res.diff(&existing) { + return Ok(Action::await_change()); + } + } + api.patch(&res.name_any(), pp, &Patch::Apply(&res)) .await .map_err(PatchError::Patch)?; diff --git a/src/controllers/mod.rs b/src/controllers/mod.rs index ec034d3..3136439 100644 --- a/src/controllers/mod.rs +++ b/src/controllers/mod.rs @@ -84,6 +84,9 @@ pub type PatchResult = std::result::Result; #[derive(Error, Debug)] pub enum PatchError { + #[error("Get error: {0}")] + Get(#[source] kube::Error), + #[error("Patch error: {0}")] Patch(#[source] kube::Error), diff --git a/src/lib.rs b/src/lib.rs index ebe7aee..6c850f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,11 @@ use std::io; use controllers::{ - addon_config::{AddonConfigSyncError, DynamicWatcherError, FleetPatchError}, - helm, BundleError, SyncError, + BundleError, SyncError, + addon_config::{ + AddonConfigSyncError, DynamicWatcherError, FleetPatchError, ReconcileConfigSyncError, + }, + helm, }; use futures::channel::mpsc::TrySendError; use thiserror::Error; @@ -39,6 +42,9 @@ pub enum Error { #[error("Dynamic watcher error: {0}")] DynamicWatcherError(#[from] DynamicWatcherError), + #[error("Reconcile config sync error: {0}")] + ReconcileConfigSync(#[from] ReconcileConfigSyncError), + #[error("Namespace trigger error: {0}")] TriggerError(#[from] TrySendError<()>), diff --git a/src/main.rs b/src/main.rs index 7dc42f5..42875b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use actix_web::{ - get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder, + App, HttpRequest, HttpResponse, HttpServer, Responder, get, middleware, web::Data, }; -pub use controller::{self, telemetry, State}; +pub use controller::{self, State, telemetry}; use kube::Client; use prometheus::{Encoder, TextEncoder}; diff --git a/src/metrics.rs b/src/metrics.rs index d4fd887..4f54387 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -3,10 +3,10 @@ use std::sync::Arc; use crate::Error; use chrono::{DateTime, Utc}; use kube::{ - runtime::events::{Recorder, Reporter}, Client, ResourceExt, + runtime::events::{Recorder, Reporter}, }; -use prometheus::{histogram_opts, opts, HistogramVec, IntCounter, IntCounterVec, Registry}; +use prometheus::{HistogramVec, IntCounter, IntCounterVec, Registry, histogram_opts, opts}; use serde::Serialize; use tokio::time::Instant; diff --git a/src/multi_dispatcher.rs b/src/multi_dispatcher.rs index 6f51779..a814d69 100644 --- a/src/multi_dispatcher.rs +++ b/src/multi_dispatcher.rs @@ -7,14 +7,14 @@ use std::{ use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_stream::stream; -use futures::{lock::Mutex, ready, Stream, StreamExt as _}; +use futures::{Stream, StreamExt as _, lock::Mutex, ready}; use kube::{ + Resource, api::{DynamicObject, GroupVersionKind}, runtime::{ - reflector::{store::Writer, Lookup, Store}, + reflector::{Lookup, Store, store::Writer}, watcher::{Event, Result}, }, - Resource, }; use pin_project::pin_project; use serde::de::DeserializeOwned; diff --git a/src/predicates.rs b/src/predicates.rs index 39afd45..68e88e9 100644 --- a/src/predicates.rs +++ b/src/predicates.rs @@ -1,5 +1,5 @@ -use kube::runtime::predicates; use kube::ResourceExt; +use kube::runtime::predicates; pub fn generation_with_deletion(obj: &impl ResourceExt) -> Option { match obj.meta().deletion_timestamp { diff --git a/src/telemetry.rs b/src/telemetry.rs index 856f1cf..4e0b065 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -1,5 +1,5 @@ use opentelemetry::trace::TraceId; -use tracing_subscriber::{prelude::*, EnvFilter, Registry}; +use tracing_subscriber::{EnvFilter, Registry, prelude::*}; /// Fetch an `opentelemetry::trace::TraceId` as hex through the full tracing stack #[must_use]