Refactor error handling and add label propagation

Signed-off-by: Danil Grigorev <danil.grigorev@suse.com>
This commit is contained in:
Danil Grigorev 2024-04-15 15:33:48 +02:00
parent 0f1199d378
commit e9caf1a739
No known key found for this signature in database
GPG Key ID: 7C96CE1776C81090
3 changed files with 83 additions and 100 deletions

View File

@ -8,7 +8,7 @@ use std::collections::BTreeMap;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
#[derive(CustomResource, Serialize, Deserialize, Clone, Debug)]
#[derive(CustomResource, Serialize, Deserialize, Clone, Debug, Default)]
#[kube(group = "fleet.cattle.io", version = "v1alpha1", kind = "Cluster", plural = "clusters")]
#[kube(namespaced)]
#[kube(status = "ClusterStatus")]

View File

@ -1,9 +1,10 @@
#![allow(unused_imports, unused_variables, dead_code)]
#![allow(unused_imports, dead_code)]
use crate::api::capi_cluster::{Cluster, ClusterStatus};
use crate::api::fleet_cluster::{Cluster as fleetcluster, ClusterSpec as fleetspec};
use crate::api::fleet_cluster;
use crate::{telemetry, Error, Metrics, Result};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kube::api::ObjectMeta;
use kube::runtime::reflector::Lookup;
use kube::{
api::{Api, ListParams, Patch, PatchParams, PostParams, ResourceExt},
@ -42,13 +43,13 @@ pub struct Context {
async fn reconcile(c: Arc<Cluster>, ctx: Arc<Context>) -> Result<Action> {
let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));
let cfg = Config::default();
let _cfg = Config::default();
let _timer = ctx.metrics.count_and_measure();
ctx.diagnostics.write().await.last_event = Utc::now();
let cluster_name = c.name_any();
let ns = c.metadata.namespace.as_ref().unwrap();
let cluster_api: Api<Cluster> = Api::namespaced(ctx.client.clone(), &ns);
let cluster_api: Api<Cluster> = Api::namespaced(ctx.client.clone(), ns);
debug!("Reconciling Cluster \"{}\" in {}", cluster_name, ns);
finalizer(&cluster_api, FLEET_FINALIZER, c, |event| async {
match event {
@ -66,120 +67,93 @@ fn error_policy(doc: Arc<Cluster>, error: &Error, ctx: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
impl Cluster {
async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action> {
let client = ctx.client.clone();
let _recorder = ctx.diagnostics.read().await.recorder(client.clone(), self);
//let ns = self.namespace().unwrap();
let ns = self.metadata.namespace.as_ref().unwrap();
let name = self.name_any();
let clusters_api: Api<Cluster> = Api::namespaced(client.clone(), &ns);
struct FleetClusterBundle {
cluster: Cluster,
fleet: fleet_cluster::Cluster,
}
let status = self.status().unwrap();
if !cluster_ready(status) {
return Ok(Action::requeue(Duration::from_secs(30)));
impl From<&Cluster> for FleetClusterBundle {
fn from(cluster: &Cluster) -> Self {
Self {
cluster: cluster.clone(),
fleet: fleet_cluster::Cluster {
metadata: ObjectMeta {
labels: Some(cluster.labels().clone()),
name: Some(cluster.name_any()),
namespace: cluster.meta().namespace.clone(),
..Default::default()
},
spec: fleet_cluster::ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", cluster.name_any())),
..Default::default()
},
status: Default::default(),
},
}
debug!("cluster has control plane ready \"{}\" in {}", name, ns);
match self.reconcile_fleet_cluster(ctx.clone()).await {
Err(e) => {
error!("Failed reconciling fleet cluster: {}", e);
}
Ok(_) => {
debug!("Reconciled Fleet cluster")
}
}
//Ok(Action::requeue(Duration::from_secs(5 * 60)))#
// let jitter = rand::thread_rng().gen_range(0..60);
// Ok(Action::requeue(Duration::from_secs(
// cfg.reconcile_ttl + jitter,
// )))
Ok(Action::await_change())
}
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
Ok(Action::await_change())
}
}
impl FleetClusterBundle {
pub async fn reconcile_fleet_cluster(&self, ctx: Arc<Context>) -> Result<Action> {
let cluster_name = self.name_any();
let ns = self.metadata.namespace.as_ref().unwrap();
let client = ctx.client.clone();
let ns = self
.cluster
.metadata
.namespace
.as_ref()
.ok_or(Error::ClusterNamespaceMissing)?;
let fleet_api: Api<fleet_cluster::Cluster> = Api::namespaced(ctx.client.clone(), ns);
let fleet_api: Api<fleetcluster> = Api::namespaced(client.clone(), ns);
let fleet_cluster = match fleet_api.get(&cluster_name).await {
Ok(obj) => Ok(Some(obj)),
Err(kubeerror::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => Ok(None),
Err(err) => Err(err),
};
match fleet_cluster {
Err(err) => {
error!("failed gettinmg fleet cluster: {}", err)
let fleet_cluster = match fleet_api.get(self.fleet.name_any().as_str()).await {
Ok(_) => return Ok(Action::await_change()),
Err(kubeerror::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => {
Ok(self.fleet.clone())
}
Ok(fc) => match fc {
Some(_) => {
debug!("fleet cluster already exists, do nothing");
return Ok(Action::await_change());
}
None => debug!("fleet cluster does not exist, creating"),
},
Err(err) => Err(err),
}
let kubeconfig = format!("{}-kubeconfig", cluster_name);
let fcluster = fleetcluster::new(
&cluster_name,
fleetspec {
agent_affinity: None,
agent_env_vars: None,
agent_namespace: None,
agent_resources: None,
agent_tolerations: None,
client_id: None,
kube_config_secret: Some(kubeconfig),
kube_config_secret_namespace: None,
paused: None,
private_repo_url: None,
redeploy_agent_generation: None,
template_values: None,
},
);
//TODO: copy labels from capi cluster
.map_err(Error::FleetClusterLookupError)?;
let pp = PostParams::default();
match fleet_api.create(&pp, &fcluster).await {
Err(err) => {
error!("Failed creating fleet cluster: {}", err);
}
Ok(_) => {
debug!("created fleet cluster");
}
}
fleet_api
.create(&pp, &fleet_cluster)
.await
.map_err(Error::FleetClusterCreateError)?;
Ok(Action::await_change())
}
}
fn cluster_ready(status: &ClusterStatus) -> bool {
if let Some(control_plane_ready) = status.control_plane_ready {
if control_plane_ready {
return true;
}
impl Cluster {
async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action> {
let cluster_bundle: FleetClusterBundle = match self.cluster_ready().map(Into::into) {
Some(b) => b,
None => return Ok(Action::await_change()),
};
cluster_bundle.reconcile_fleet_cluster(ctx).await
}
if let Some(conditions) = &status.conditions {
let read_condition = conditions.iter().find(|condition| {
condition.type_ == CONTROLPLANE_READY_CONDITION && condition.status == "True"
});
if read_condition.is_some() {
return true;
}
pub fn cluster_ready(&self) -> Option<&Self> {
let cp_ready = self
.status
.iter()
.filter_map(|status| status.control_plane_ready)
.find(|&ready| ready)
.map(|_| self);
let ready_condition = self
.status
.iter()
.filter_map(|status| status.conditions.clone())
.flatten()
.find(|c| c.type_ == CONTROLPLANE_READY_CONDITION && c.status == "True")
.map(|_| self);
cp_ready.or(ready_condition)
}
false
async fn cleanup(&self, _ctx: Arc<Context>) -> Result<Action> {
Ok(Action::await_change())
}
}
/// Diagnostics to be exposed by the web server

View File

@ -8,11 +8,20 @@ pub enum Error {
#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
#[error("Fleet cluster lookup error: {0}")]
FleetClusterLookupError(#[source] kube::Error),
#[error("Fleet cluster create error: {0}")]
FleetClusterCreateError(#[source] kube::Error),
#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
// so boxing this error to break cycles
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<Error>>),
#[error("Missing cluster namespace")]
ClusterNamespaceMissing,
#[error("IllegalDocument")]
IllegalDocument,
}