Implement Prometheus instrumentation

Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
Stefan Prodan 2020-10-13 13:54:35 +03:00
parent c8c2eec3a6
commit 03e32491bf
No known key found for this signature in database
GPG Key ID: 3299AEB0E4085BAF
5 changed files with 166 additions and 0 deletions

View File

@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -53,6 +54,7 @@ type BucketReconciler struct {
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@ -91,6 +93,8 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(bucket, true)
// Remove our finalizer from the list and update it
bucket.ObjectMeta.Finalizers = removeString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &bucket); err != nil {
@ -101,6 +105,15 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetBucket, ok := r.resetStatus(bucket); ok {
bucket = resetBucket
@ -108,6 +121,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(bucket, false)
}
// purge old artifacts from storage
@ -127,6 +141,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledBucket, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledBucket, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
@ -134,6 +149,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
r.event(reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
}
r.recordReadiness(reconciledBucket, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -365,3 +381,26 @@ func (r *BucketReconciler) event(bucket sourcev1.Bucket, severity, msg string) {
}
}
}
func (r *BucketReconciler) recordReadiness(bucket sourcev1.Bucket, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
r.Log.WithValues(
strings.ToLower(bucket.Kind),
fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -52,6 +53,7 @@ type GitRepositoryReconciler struct {
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -90,6 +92,8 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(repository, true)
// Remove our finalizer from the list and update it
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
@ -100,6 +104,15 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
repository = resetRepository
@ -107,6 +120,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(repository, false)
}
// purge old artifacts from storage
@ -126,6 +140,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledRepository, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
@ -133,6 +148,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(reconciledRepository, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -325,3 +341,26 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever
}
}
}
func (r *GitRepositoryReconciler) recordReadiness(repository sourcev1.GitRepository, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/pkg/untar"
@ -57,6 +58,7 @@ type HelmChartReconciler struct {
Getters getter.Providers
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
@ -95,6 +97,8 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(chart, true)
// Remove our finalizer from the list and update it
chart.ObjectMeta.Finalizers = removeString(chart.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &chart); err != nil {
@ -105,6 +109,15 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &chart)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// Conditionally set progressing condition in status
resetChart, changed := r.resetStatus(chart)
if changed {
@ -113,6 +126,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(chart, false)
}
// Purge all but current artifact from storage
@ -138,6 +152,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if err := r.Status().Update(ctx, &chart); err != nil {
log.Error(err, "unable to update status")
}
r.recordReadiness(chart, false)
return ctrl.Result{Requeue: true}, err
}
@ -164,6 +179,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// If reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledChart, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledChart, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
@ -171,6 +187,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if chart.Status.Artifact == nil || reconciledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision {
r.event(reconciledChart, events.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart))
}
r.recordReadiness(reconciledChart, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -527,3 +544,26 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri
}
}
}
func (r *HelmChartReconciler) recordReadiness(chart sourcev1.HelmChart, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &chart)
if err != nil {
r.Log.WithValues(
strings.ToLower(chart.Kind),
fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(chart.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -38,6 +38,7 @@ import (
"sigs.k8s.io/yaml"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -53,6 +54,7 @@ type HelmRepositoryReconciler struct {
Getters getter.Providers
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@ -92,6 +94,8 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(repository, true)
// Remove our finalizer from the list and update it
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
@ -102,6 +106,15 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
repository = resetRepository
@ -109,6 +122,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(repository, false)
}
// purge old artifacts from storage
@ -128,6 +142,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledRepository, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
@ -135,6 +150,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(reconciledRepository, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -299,3 +315,26 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev
}
}
}
func (r *HelmRepositoryReconciler) recordReadiness(repository sourcev1.HelmRepository, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -32,9 +32,11 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/pkg/runtime/metrics"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/controllers"
@ -98,6 +100,9 @@ func main() {
}
}
metricsRecorder := metrics.NewRecorder()
crtlmetrics.Registry.MustRegister(metricsRecorder.Collectors()...)
watchNamespace := ""
if !watchAllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
@ -128,6 +133,7 @@ func main() {
Storage: storage,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -142,6 +148,7 @@ func main() {
Getters: getters,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -156,6 +163,7 @@ func main() {
Getters: getters,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -169,6 +177,7 @@ func main() {
Storage: storage,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {