diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 1ab3eedc..d5c77141 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" @@ -53,6 +54,7 @@ type BucketReconciler struct { Storage *Storage EventRecorder kuberecorder.EventRecorder ExternalEventRecorder *events.Recorder + MetricsRecorder *metrics.Recorder } // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete @@ -91,6 +93,8 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } + // Record deleted status + r.recordReadiness(bucket, true) // Remove our finalizer from the list and update it bucket.ObjectMeta.Finalizers = removeString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) if err := r.Update(ctx, &bucket); err != nil { @@ -101,6 +105,15 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { } } + // record reconciliation duration + if r.MetricsRecorder != nil { + objRef, err := reference.GetReference(r.Scheme, &bucket) + if err != nil { + return ctrl.Result{}, err + } + defer r.MetricsRecorder.RecordDuration(*objRef, start) + } + // set initial status if resetBucket, ok := r.resetStatus(bucket); ok { bucket = resetBucket @@ -108,6 +121,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } + r.recordReadiness(bucket, false) } // purge old artifacts from storage @@ -127,6 +141,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // if reconciliation failed, record the failure and requeue immediately if reconcileErr != nil { r.event(reconciledBucket, events.EventSeverityError, reconcileErr.Error()) + r.recordReadiness(reconciledBucket, false) return ctrl.Result{Requeue: true}, reconcileErr } @@ -134,6 +149,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision { r.event(reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket)) } + r.recordReadiness(reconciledBucket, false) log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Now().Sub(start).String(), @@ -365,3 +381,26 @@ func (r *BucketReconciler) event(bucket sourcev1.Bucket, severity, msg string) { } } } + +func (r *BucketReconciler) recordReadiness(bucket sourcev1.Bucket, deleted bool) { + if r.MetricsRecorder == nil { + return + } + + objRef, err := reference.GetReference(r.Scheme, &bucket) + if err != nil { + r.Log.WithValues( + strings.ToLower(bucket.Kind), + fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()), + ).Error(err, "unable to record readiness metric") + return + } + if rc := meta.GetCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil { + r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted) + } else { + r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{ + Type: meta.ReadyCondition, + Status: corev1.ConditionUnknown, + }, deleted) + } +} diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index ad231810..959c0c5a 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" @@ -52,6 +53,7 @@ type GitRepositoryReconciler struct { Storage *Storage EventRecorder kuberecorder.EventRecorder ExternalEventRecorder *events.Recorder + MetricsRecorder *metrics.Recorder } // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete @@ -90,6 +92,8 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } + // Record deleted status + r.recordReadiness(repository, true) // Remove our finalizer from the list and update it repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) if err := r.Update(ctx, &repository); err != nil { @@ -100,6 +104,15 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro } } + // record reconciliation duration + if r.MetricsRecorder != nil { + objRef, err := reference.GetReference(r.Scheme, &repository) + if err != nil { + return ctrl.Result{}, err + } + defer r.MetricsRecorder.RecordDuration(*objRef, start) + } + // set initial status if resetRepository, ok := r.resetStatus(repository); ok { repository = resetRepository @@ -107,6 +120,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } + r.recordReadiness(repository, false) } // purge old artifacts from storage @@ -126,6 +140,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro // if reconciliation failed, record the failure and requeue immediately if reconcileErr != nil { r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error()) + r.recordReadiness(reconciledRepository, false) return ctrl.Result{Requeue: true}, reconcileErr } @@ -133,6 +148,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision { r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository)) } + r.recordReadiness(reconciledRepository, false) log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Now().Sub(start).String(), @@ -325,3 +341,26 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever } } } + +func (r *GitRepositoryReconciler) recordReadiness(repository sourcev1.GitRepository, deleted bool) { + if r.MetricsRecorder == nil { + return + } + + objRef, err := reference.GetReference(r.Scheme, &repository) + if err != nil { + r.Log.WithValues( + strings.ToLower(repository.Kind), + fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()), + ).Error(err, "unable to record readiness metric") + return + } + if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil { + r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted) + } else { + r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{ + Type: meta.ReadyCondition, + Status: corev1.ConditionUnknown, + }, deleted) + } +} diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index e9f5a098..8f3166f9 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/pkg/untar" @@ -57,6 +58,7 @@ type HelmChartReconciler struct { Getters getter.Providers EventRecorder kuberecorder.EventRecorder ExternalEventRecorder *events.Recorder + MetricsRecorder *metrics.Recorder } // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete @@ -95,6 +97,8 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } + // Record deleted status + r.recordReadiness(chart, true) // Remove our finalizer from the list and update it chart.ObjectMeta.Finalizers = removeString(chart.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) if err := r.Update(ctx, &chart); err != nil { @@ -105,6 +109,15 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { } } + // record reconciliation duration + if r.MetricsRecorder != nil { + objRef, err := reference.GetReference(r.Scheme, &chart) + if err != nil { + return ctrl.Result{}, err + } + defer r.MetricsRecorder.RecordDuration(*objRef, start) + } + // Conditionally set progressing condition in status resetChart, changed := r.resetStatus(chart) if changed { @@ -113,6 +126,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } + r.recordReadiness(chart, false) } // Purge all but current artifact from storage @@ -138,6 +152,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { if err := r.Status().Update(ctx, &chart); err != nil { log.Error(err, "unable to update status") } + r.recordReadiness(chart, false) return ctrl.Result{Requeue: true}, err } @@ -164,6 +179,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // If reconciliation failed, record the failure and requeue immediately if reconcileErr != nil { r.event(reconciledChart, events.EventSeverityError, reconcileErr.Error()) + r.recordReadiness(reconciledChart, false) return ctrl.Result{Requeue: true}, reconcileErr } @@ -171,6 +187,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { if chart.Status.Artifact == nil || reconciledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision { r.event(reconciledChart, events.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart)) } + r.recordReadiness(reconciledChart, false) log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Now().Sub(start).String(), @@ -527,3 +544,26 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri } } } + +func (r *HelmChartReconciler) recordReadiness(chart sourcev1.HelmChart, deleted bool) { + if r.MetricsRecorder == nil { + return + } + + objRef, err := reference.GetReference(r.Scheme, &chart) + if err != nil { + r.Log.WithValues( + strings.ToLower(chart.Kind), + fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()), + ).Error(err, "unable to record readiness metric") + return + } + if rc := meta.GetCondition(chart.Status.Conditions, meta.ReadyCondition); rc != nil { + r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted) + } else { + r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{ + Type: meta.ReadyCondition, + Status: corev1.ConditionUnknown, + }, deleted) + } +} diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 7b170307..ad248464 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/yaml" "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" @@ -53,6 +54,7 @@ type HelmRepositoryReconciler struct { Getters getter.Providers EventRecorder kuberecorder.EventRecorder ExternalEventRecorder *events.Recorder + MetricsRecorder *metrics.Recorder } // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete @@ -92,6 +94,8 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } + // Record deleted status + r.recordReadiness(repository, true) // Remove our finalizer from the list and update it repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) if err := r.Update(ctx, &repository); err != nil { @@ -102,6 +106,15 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err } } + // record reconciliation duration + if r.MetricsRecorder != nil { + objRef, err := reference.GetReference(r.Scheme, &repository) + if err != nil { + return ctrl.Result{}, err + } + defer r.MetricsRecorder.RecordDuration(*objRef, start) + } + // set initial status if resetRepository, ok := r.resetStatus(repository); ok { repository = resetRepository @@ -109,6 +122,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } + r.recordReadiness(repository, false) } // purge old artifacts from storage @@ -128,6 +142,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err // if reconciliation failed, record the failure and requeue immediately if reconcileErr != nil { r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error()) + r.recordReadiness(reconciledRepository, false) return ctrl.Result{Requeue: true}, reconcileErr } @@ -135,6 +150,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision { r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository)) } + r.recordReadiness(reconciledRepository, false) log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Now().Sub(start).String(), @@ -299,3 +315,26 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev } } } + +func (r *HelmRepositoryReconciler) recordReadiness(repository sourcev1.HelmRepository, deleted bool) { + if r.MetricsRecorder == nil { + return + } + + objRef, err := reference.GetReference(r.Scheme, &repository) + if err != nil { + r.Log.WithValues( + strings.ToLower(repository.Kind), + fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()), + ).Error(err, "unable to record readiness metric") + return + } + if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil { + r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted) + } else { + r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{ + Type: meta.ReadyCondition, + Status: corev1.ConditionUnknown, + }, deleted) + } +} diff --git a/main.go b/main.go index 410dc3f5..86220c41 100644 --- a/main.go +++ b/main.go @@ -32,9 +32,11 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" + crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/logger" + "github.com/fluxcd/pkg/runtime/metrics" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/controllers" @@ -98,6 +100,9 @@ func main() { } } + metricsRecorder := metrics.NewRecorder() + crtlmetrics.Registry.MustRegister(metricsRecorder.Collectors()...) + watchNamespace := "" if !watchAllNamespaces { watchNamespace = os.Getenv("RUNTIME_NAMESPACE") @@ -128,6 +133,7 @@ func main() { Storage: storage, EventRecorder: mgr.GetEventRecorderFor("source-controller"), ExternalEventRecorder: eventRecorder, + MetricsRecorder: metricsRecorder, }).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil { @@ -142,6 +148,7 @@ func main() { Getters: getters, EventRecorder: mgr.GetEventRecorderFor("source-controller"), ExternalEventRecorder: eventRecorder, + MetricsRecorder: metricsRecorder, }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil { @@ -156,6 +163,7 @@ func main() { Getters: getters, EventRecorder: mgr.GetEventRecorderFor("source-controller"), ExternalEventRecorder: eventRecorder, + MetricsRecorder: metricsRecorder, }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil { @@ -169,6 +177,7 @@ func main() { Storage: storage, EventRecorder: mgr.GetEventRecorderFor("source-controller"), ExternalEventRecorder: eventRecorder, + MetricsRecorder: metricsRecorder, }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil {