Add ability to patch created objects to maintain the state

Signed-off-by: Danil Grigorev <danil.grigorev@suse.com>
This commit is contained in:
Danil Grigorev 2024-04-29 14:08:37 +02:00
parent 01b75af280
commit 2fe2148bfa
No known key found for this signature in database
GPG Key ID: 7C96CE1776C81090
4 changed files with 108 additions and 17 deletions

View File

@ -12,14 +12,14 @@ use kube::{api::ResourceExt, runtime::controller::Action, Resource};
use std::sync::Arc;
use super::cluster_class::CLUSTER_CLASS_LABEL;
use super::controller::{get_or_create, Context, FleetBundle, FleetController};
use super::SyncError;
use super::controller::{get_or_create, patch, Context, FleetBundle, FleetController};
use super::{ClusterSyncError, SyncError};
pub static CONTROLPLANE_READY_CONDITION: &str = "ControlPlaneReady";
pub struct FleetClusterBundle {
fleet: fleet_cluster::Cluster,
config: FleetAddonConfig
config: FleetAddonConfig,
}
impl From<&Cluster> for fleet_cluster::Cluster {
@ -60,10 +60,19 @@ impl From<&Cluster> for fleet_cluster::Cluster {
impl FleetBundle for FleetClusterBundle {
async fn sync(&self, ctx: Arc<Context>) -> Result<Action> {
get_or_create(ctx, self.fleet.clone())
get_or_create(ctx.clone(), self.fleet.clone())
.await
.map_err(SyncError::ClusterSync)
.map_err(Into::into)
.map_err(Into::<ClusterSyncError>::into)
.map_err(Into::<SyncError>::into)?;
if let Some(true) = self.config.spec.patch_resource {
patch(ctx, self.fleet.clone())
.await
.map_err(Into::<ClusterSyncError>::into)
.map_err(Into::<SyncError>::into)?;
}
Ok(Action::await_change())
}
}
@ -91,7 +100,7 @@ impl FleetController for Cluster {
fleet.metadata.owner_references = None
}
Ok(FleetClusterBundle{
Ok(FleetClusterBundle {
fleet,
config: config.clone(),
})

View File

@ -11,8 +11,8 @@ use kube::{api::ResourceExt, runtime::controller::Action, Resource};
use std::sync::Arc;
use super::controller::{get_or_create, Context, FleetBundle, FleetController};
use super::SyncError;
use super::controller::{get_or_create, patch, Context, FleetBundle, FleetController};
use super::{GroupSyncError, SyncError};
pub static CLUSTER_CLASS_LABEL: &str = "clusterclass-name.fleet.addons.cluster.x-k8s.io";
@ -54,10 +54,19 @@ impl From<&ClusterClass> for ClusterGroup {
impl FleetBundle for FleetClusterClassBundle {
async fn sync(&self, ctx: Arc<Context>) -> Result<Action> {
get_or_create(ctx, self.fleet_group.clone())
get_or_create(ctx.clone(), self.fleet_group.clone())
.await
.map_err(SyncError::GroupSync)
.map_err(Into::into)
.map_err(Into::<GroupSyncError>::into)
.map_err(Into::<SyncError>::into)?;
if let Some(true) = self.config.spec.patch_resource {
patch(ctx, self.fleet_group.clone())
.await
.map_err(Into::<GroupSyncError>::into)
.map_err(Into::<SyncError>::into)?;
}
Ok(Action::await_change())
}
}

View File

@ -1,11 +1,12 @@
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::controllers::PatchError;
use crate::metrics::Diagnostics;
use crate::{telemetry, Error, Metrics};
use chrono::Utc;
use k8s_openapi::NamespaceResourceScope;
use kube::api::PostParams;
use kube::api::{Patch, PatchParams, PostParams};
use kube::runtime::events::{Event, EventType};
use kube::runtime::finalizer;
@ -84,6 +85,51 @@ where
Ok(Action::await_change())
}
pub(crate) async fn patch<R>(ctx: Arc<Context>, res: R) -> Result<Action, PatchError>
where
R: std::fmt::Debug,
R: Clone + Serialize + DeserializeOwned,
R: kube::Resource<DynamicType = (), Scope = NamespaceResourceScope>,
R: kube::ResourceExt,
{
let ns = res
.meta()
.namespace
.clone()
.unwrap_or(String::from("default"));
let api: Api<R> = Api::namespaced(ctx.client.clone(), &ns);
api.patch(
&res.name_any(),
&PatchParams::apply("addon-provider-fleet"),
&Patch::Apply(res.clone()),
)
.await
.map_err(PatchError::Patch)?;
info!("Updated fleet object");
ctx.diagnostics
.read()
.await
.recorder(ctx.client.clone(), &res)
// Record object creation
.publish(Event {
type_: EventType::Normal,
reason: "Updated".into(),
note: Some(format!(
"Updated fleet object `{}` in `{}`",
res.name_any(),
res.namespace().unwrap_or_default()
)),
action: "Creating".into(),
secondary: None,
})
.await
.map_err(PatchError::Event)?;
Ok(Action::await_change())
}
pub(crate) trait FleetBundle {
async fn sync(&self, ctx: Arc<Context>) -> crate::Result<Action>;
}

View File

@ -2,16 +2,34 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum SyncError {
#[error("Cluster sync error: {0}")]
ClusterSync(#[source] GetOrCreateError),
#[error("{0}")]
ClusterSync(#[from] ClusterSyncError),
#[error("Cluster group sync error: {0}")]
GroupSync(#[source] GetOrCreateError),
#[error("{0}")]
GroupSync(#[from] GroupSyncError),
#[error("Return early")]
EarlyReturn,
}
#[derive(Error, Debug)]
pub enum ClusterSyncError {
#[error("Cluster create error: {0}")]
GetOrCreateError(#[from] GetOrCreateError),
#[error("Cluster update error: {0}")]
PatchError(#[from] PatchError),
}
#[derive(Error, Debug)]
pub enum GroupSyncError {
#[error("Cluster group create error: {0}")]
GetOrCreateError(#[from] GetOrCreateError),
#[error("Cluster group update error: {0}")]
PatchError(#[from] PatchError),
}
#[derive(Error, Debug)]
pub enum GetOrCreateError {
#[error("Lookup error: {0}")]
@ -24,6 +42,15 @@ pub enum GetOrCreateError {
Event(#[from] kube::Error),
}
#[derive(Error, Debug)]
pub enum PatchError {
#[error("Patch error: {0}")]
Patch(#[source] kube::Error),
#[error("Diagnostics error: {0}")]
Event(#[from] kube::Error),
}
pub mod cluster;
pub mod cluster_class;
pub mod controller;