diff --git a/src/api/fleet_cluster.rs b/src/api/fleet_cluster.rs index 9f9ce7f..f83fee0 100644 --- a/src/api/fleet_cluster.rs +++ b/src/api/fleet_cluster.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition; -#[derive(CustomResource, Serialize, Deserialize, Clone, Debug)] +#[derive(CustomResource, Serialize, Deserialize, Clone, Debug, Default)] #[kube(group = "fleet.cattle.io", version = "v1alpha1", kind = "Cluster", plural = "clusters")] #[kube(namespaced)] #[kube(status = "ClusterStatus")] diff --git a/src/controller.rs b/src/controller.rs index 5686b63..d86c55d 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,9 +1,10 @@ -#![allow(unused_imports, unused_variables, dead_code)] +#![allow(unused_imports, dead_code)] use crate::api::capi_cluster::{Cluster, ClusterStatus}; -use crate::api::fleet_cluster::{Cluster as fleetcluster, ClusterSpec as fleetspec}; +use crate::api::fleet_cluster; use crate::{telemetry, Error, Metrics, Result}; use chrono::{DateTime, Utc}; use futures::StreamExt; +use kube::api::ObjectMeta; use kube::runtime::reflector::Lookup; use kube::{ api::{Api, ListParams, Patch, PatchParams, PostParams, ResourceExt}, @@ -42,13 +43,13 @@ pub struct Context { async fn reconcile(c: Arc, ctx: Arc) -> Result { let trace_id = telemetry::get_trace_id(); Span::current().record("trace_id", &field::display(&trace_id)); - let cfg = Config::default(); + let _cfg = Config::default(); let _timer = ctx.metrics.count_and_measure(); ctx.diagnostics.write().await.last_event = Utc::now(); let cluster_name = c.name_any(); let ns = c.metadata.namespace.as_ref().unwrap(); - let cluster_api: Api = Api::namespaced(ctx.client.clone(), &ns); + let cluster_api: Api = Api::namespaced(ctx.client.clone(), ns); debug!("Reconciling Cluster \"{}\" in {}", cluster_name, ns); finalizer(&cluster_api, FLEET_FINALIZER, c, |event| async { match event { @@ -66,120 +67,93 @@ fn error_policy(doc: Arc, error: &Error, ctx: Arc) -> Action { Action::requeue(Duration::from_secs(5 * 60)) } -impl Cluster { - async fn reconcile(&self, ctx: Arc) -> Result { - let client = ctx.client.clone(); - let _recorder = ctx.diagnostics.read().await.recorder(client.clone(), self); - //let ns = self.namespace().unwrap(); - let ns = self.metadata.namespace.as_ref().unwrap(); - let name = self.name_any(); - let clusters_api: Api = Api::namespaced(client.clone(), &ns); +struct FleetClusterBundle { + cluster: Cluster, + fleet: fleet_cluster::Cluster, +} - let status = self.status().unwrap(); - if !cluster_ready(status) { - return Ok(Action::requeue(Duration::from_secs(30))); +impl From<&Cluster> for FleetClusterBundle { + fn from(cluster: &Cluster) -> Self { + Self { + cluster: cluster.clone(), + fleet: fleet_cluster::Cluster { + metadata: ObjectMeta { + labels: Some(cluster.labels().clone()), + name: Some(cluster.name_any()), + namespace: cluster.meta().namespace.clone(), + ..Default::default() + }, + spec: fleet_cluster::ClusterSpec { + kube_config_secret: Some(format!("{}-kubeconfig", cluster.name_any())), + ..Default::default() + }, + status: Default::default(), + }, } - debug!("cluster has control plane ready \"{}\" in {}", name, ns); - - match self.reconcile_fleet_cluster(ctx.clone()).await { - Err(e) => { - error!("Failed reconciling fleet cluster: {}", e); - } - Ok(_) => { - debug!("Reconciled Fleet cluster") - } - } - - //Ok(Action::requeue(Duration::from_secs(5 * 60)))# - // let jitter = rand::thread_rng().gen_range(0..60); - // Ok(Action::requeue(Duration::from_secs( - // cfg.reconcile_ttl + jitter, - // ))) - Ok(Action::await_change()) - } - - async fn cleanup(&self, ctx: Arc) -> Result { - Ok(Action::await_change()) } +} +impl FleetClusterBundle { pub async fn reconcile_fleet_cluster(&self, ctx: Arc) -> Result { - let cluster_name = self.name_any(); - let ns = self.metadata.namespace.as_ref().unwrap(); - let client = ctx.client.clone(); + let ns = self + .cluster + .metadata + .namespace + .as_ref() + .ok_or(Error::ClusterNamespaceMissing)?; + let fleet_api: Api = Api::namespaced(ctx.client.clone(), ns); - let fleet_api: Api = Api::namespaced(client.clone(), ns); - - let fleet_cluster = match fleet_api.get(&cluster_name).await { - Ok(obj) => Ok(Some(obj)), - Err(kubeerror::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => Ok(None), - Err(err) => Err(err), - }; - - match fleet_cluster { - Err(err) => { - error!("failed gettinmg fleet cluster: {}", err) + let fleet_cluster = match fleet_api.get(self.fleet.name_any().as_str()).await { + Ok(_) => return Ok(Action::await_change()), + Err(kubeerror::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => { + Ok(self.fleet.clone()) } - Ok(fc) => match fc { - Some(_) => { - debug!("fleet cluster already exists, do nothing"); - return Ok(Action::await_change()); - } - None => debug!("fleet cluster does not exist, creating"), - }, + Err(err) => Err(err), } - - let kubeconfig = format!("{}-kubeconfig", cluster_name); - let fcluster = fleetcluster::new( - &cluster_name, - fleetspec { - agent_affinity: None, - agent_env_vars: None, - agent_namespace: None, - agent_resources: None, - agent_tolerations: None, - client_id: None, - kube_config_secret: Some(kubeconfig), - kube_config_secret_namespace: None, - paused: None, - private_repo_url: None, - redeploy_agent_generation: None, - template_values: None, - }, - ); - - //TODO: copy labels from capi cluster + .map_err(Error::FleetClusterLookupError)?; let pp = PostParams::default(); - match fleet_api.create(&pp, &fcluster).await { - Err(err) => { - error!("Failed creating fleet cluster: {}", err); - } - Ok(_) => { - debug!("created fleet cluster"); - } - } + fleet_api + .create(&pp, &fleet_cluster) + .await + .map_err(Error::FleetClusterCreateError)?; Ok(Action::await_change()) } } -fn cluster_ready(status: &ClusterStatus) -> bool { - if let Some(control_plane_ready) = status.control_plane_ready { - if control_plane_ready { - return true; - } +impl Cluster { + async fn reconcile(&self, ctx: Arc) -> Result { + let cluster_bundle: FleetClusterBundle = match self.cluster_ready().map(Into::into) { + Some(b) => b, + None => return Ok(Action::await_change()), + }; + + cluster_bundle.reconcile_fleet_cluster(ctx).await } - if let Some(conditions) = &status.conditions { - let read_condition = conditions.iter().find(|condition| { - condition.type_ == CONTROLPLANE_READY_CONDITION && condition.status == "True" - }); - if read_condition.is_some() { - return true; - } + pub fn cluster_ready(&self) -> Option<&Self> { + let cp_ready = self + .status + .iter() + .filter_map(|status| status.control_plane_ready) + .find(|&ready| ready) + .map(|_| self); + + let ready_condition = self + .status + .iter() + .filter_map(|status| status.conditions.clone()) + .flatten() + .find(|c| c.type_ == CONTROLPLANE_READY_CONDITION && c.status == "True") + .map(|_| self); + + cp_ready.or(ready_condition) } - false + async fn cleanup(&self, _ctx: Arc) -> Result { + Ok(Action::await_change()) + } } /// Diagnostics to be exposed by the web server diff --git a/src/lib.rs b/src/lib.rs index b0e2a3a..2dad576 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,20 @@ pub enum Error { #[error("Kube Error: {0}")] KubeError(#[source] kube::Error), + #[error("Fleet cluster lookup error: {0}")] + FleetClusterLookupError(#[source] kube::Error), + + #[error("Fleet cluster create error: {0}")] + FleetClusterCreateError(#[source] kube::Error), + #[error("Finalizer Error: {0}")] // NB: awkward type because finalizer::Error embeds the reconciler error (which is this) // so boxing this error to break cycles FinalizerError(#[source] Box>), + #[error("Missing cluster namespace")] + ClusterNamespaceMissing, + #[error("IllegalDocument")] IllegalDocument, }