Merge pull request #166 from fluxcd/metrics

Implement Prometheus instrumentation
This commit is contained in:
Stefan Prodan 2020-10-13 15:00:03 +03:00 committed by GitHub
commit 281a0ee4f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 194 additions and 31 deletions

View File

@ -39,7 +39,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/recorder"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -52,7 +53,8 @@ type BucketReconciler struct {
Scheme *runtime.Scheme
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *recorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@ -87,10 +89,12 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if containsString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
// Our finalizer is still present, so lets handle garbage collection
if err := r.gc(bucket, true); err != nil {
r.event(bucket, recorder.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.event(bucket, events.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(bucket, true)
// Remove our finalizer from the list and update it
bucket.ObjectMeta.Finalizers = removeString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &bucket); err != nil {
@ -101,6 +105,15 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetBucket, ok := r.resetStatus(bucket); ok {
bucket = resetBucket
@ -108,6 +121,7 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(bucket, false)
}
// purge old artifacts from storage
@ -126,14 +140,16 @@ func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledBucket, recorder.EventSeverityError, reconcileErr.Error())
r.event(reconciledBucket, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledBucket, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
r.event(reconciledBucket, recorder.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
r.event(reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
}
r.recordReadiness(reconciledBucket, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -365,3 +381,26 @@ func (r *BucketReconciler) event(bucket sourcev1.Bucket, severity, msg string) {
}
}
}
func (r *BucketReconciler) recordReadiness(bucket sourcev1.Bucket, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
r.Log.WithValues(
strings.ToLower(bucket.Kind),
fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -37,7 +37,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/recorder"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -51,7 +52,8 @@ type GitRepositoryReconciler struct {
Scheme *runtime.Scheme
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *recorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -86,10 +88,12 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
if containsString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
// Our finalizer is still present, so lets handle garbage collection
if err := r.gc(repository, true); err != nil {
r.event(repository, recorder.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.event(repository, events.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(repository, true)
// Remove our finalizer from the list and update it
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
@ -100,6 +104,15 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
repository = resetRepository
@ -107,6 +120,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(repository, false)
}
// purge old artifacts from storage
@ -125,14 +139,16 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, recorder.EventSeverityError, reconcileErr.Error())
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledRepository, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, recorder.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(reconciledRepository, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -325,3 +341,26 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever
}
}
}
func (r *GitRepositoryReconciler) recordReadiness(repository sourcev1.GitRepository, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -40,7 +40,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/recorder"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/pkg/untar"
@ -56,7 +57,8 @@ type HelmChartReconciler struct {
Storage *Storage
Getters getter.Providers
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *recorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
@ -91,10 +93,12 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if containsString(chart.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
// Our finalizer is still present, so lets handle garbage collection
if err := r.gc(chart, true); err != nil {
r.event(chart, recorder.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.event(chart, events.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(chart, true)
// Remove our finalizer from the list and update it
chart.ObjectMeta.Finalizers = removeString(chart.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &chart); err != nil {
@ -105,6 +109,15 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &chart)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// Conditionally set progressing condition in status
resetChart, changed := r.resetStatus(chart)
if changed {
@ -113,6 +126,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(chart, false)
}
// Purge all but current artifact from storage
@ -138,6 +152,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if err := r.Status().Update(ctx, &chart); err != nil {
log.Error(err, "unable to update status")
}
r.recordReadiness(chart, false)
return ctrl.Result{Requeue: true}, err
}
@ -163,14 +178,16 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// If reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledChart, recorder.EventSeverityError, reconcileErr.Error())
r.event(reconciledChart, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledChart, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
// Emit an event if we did not have an artifact before, or the revision has changed
if chart.Status.Artifact == nil || reconciledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision {
r.event(reconciledChart, recorder.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart))
r.event(reconciledChart, events.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart))
}
r.recordReadiness(reconciledChart, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -527,3 +544,26 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri
}
}
}
func (r *HelmChartReconciler) recordReadiness(chart sourcev1.HelmChart, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &chart)
if err != nil {
r.Log.WithValues(
strings.ToLower(chart.Kind),
fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(chart.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

View File

@ -37,7 +37,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/yaml"
"github.com/fluxcd/pkg/recorder"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
@ -52,7 +53,8 @@ type HelmRepositoryReconciler struct {
Storage *Storage
Getters getter.Providers
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *recorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@ -88,10 +90,12 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
if containsString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
// Our finalizer is still present, so lets handle garbage collection
if err := r.gc(repository, true); err != nil {
r.event(repository, recorder.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.event(repository, events.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(repository, true)
// Remove our finalizer from the list and update it
repository.ObjectMeta.Finalizers = removeString(repository.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
@ -102,6 +106,15 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
}
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
repository = resetRepository
@ -109,6 +122,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(repository, false)
}
// purge old artifacts from storage
@ -127,14 +141,16 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, recorder.EventSeverityError, reconcileErr.Error())
r.event(reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(reconciledRepository, false)
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, recorder.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
r.event(reconciledRepository, events.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(reconciledRepository, false)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
@ -299,3 +315,26 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev
}
}
}
func (r *HelmRepositoryReconciler) recordReadiness(repository sourcev1.HelmRepository, deleted bool) {
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to record readiness metric")
return
}
if rc := meta.GetCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, deleted)
} else {
r.MetricsRecorder.RecordCondition(*objRef, meta.Condition{
Type: meta.ReadyCondition,
Status: corev1.ConditionUnknown,
}, deleted)
}
}

3
go.mod
View File

@ -10,8 +10,7 @@ require (
github.com/fluxcd/pkg/gittestserver v0.0.2
github.com/fluxcd/pkg/helmtestserver v0.0.1
github.com/fluxcd/pkg/lockedfile v0.0.5
github.com/fluxcd/pkg/recorder v0.0.6
github.com/fluxcd/pkg/runtime v0.0.6
github.com/fluxcd/pkg/runtime v0.1.0
github.com/fluxcd/pkg/ssh v0.0.5
github.com/fluxcd/pkg/untar v0.0.5
github.com/fluxcd/source-controller/api v0.1.0

10
go.sum
View File

@ -212,10 +212,8 @@ github.com/fluxcd/pkg/helmtestserver v0.0.1 h1:8RcLZdg7Zr9ZqyijsIIASjjMXQtF4UWP4
github.com/fluxcd/pkg/helmtestserver v0.0.1/go.mod h1:GR8LriiU7PqZSTH4Xe6Cimpig2VVPB29PeUXJjNJYfA=
github.com/fluxcd/pkg/lockedfile v0.0.5 h1:C3T8wfdff1UY1bvplmCkGOLrdMWJHO8Q8+tdlEXJYzQ=
github.com/fluxcd/pkg/lockedfile v0.0.5/go.mod h1:uAtPUBId6a2RqO84MTH5HKGX0SbM1kNW3Wr/FhYyDVA=
github.com/fluxcd/pkg/recorder v0.0.6 h1:me/n8syeeGXz50OXoPX3jgIj9AtinvhHdKT9Dy+MbHs=
github.com/fluxcd/pkg/recorder v0.0.6/go.mod h1:IfQxfVRSNsWs3B0Yp5B6ObEWwKHILlAx8N7XkoDdhFg=
github.com/fluxcd/pkg/runtime v0.0.6 h1:m7qwr2wRePs1vzVlM0Y88vitXSsv1lb3QCJflRpa3qQ=
github.com/fluxcd/pkg/runtime v0.0.6/go.mod h1:iLjncjktQVpqpb1NsY2fW+UYDFOtVyt+yJrxqrrK8A0=
github.com/fluxcd/pkg/runtime v0.1.0 h1:mCLj5GlQZqWtK3tvtZTmfgFOLsTUY1iqg3CmEyS1nRs=
github.com/fluxcd/pkg/runtime v0.1.0/go.mod h1:OXkrYtDLw3GhclbzvnzfSktUyxRmC3FFhXj0tVVaIX8=
github.com/fluxcd/pkg/ssh v0.0.5 h1:rnbFZ7voy2JBlUfMbfyqArX2FYaLNpDhccGFC3qW83A=
github.com/fluxcd/pkg/ssh v0.0.5/go.mod h1:7jXPdXZpc0ttMNz2kD9QuMi3RNn/e0DOFbj0Tij/+Hs=
github.com/fluxcd/pkg/testserver v0.0.2 h1:SoaMtO9cE5p/wl2zkGudzflnEHd9mk68CGjZOo7w0Uk=
@ -407,8 +405,8 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.7 h1:8/CAEZt/+F7kR7GevNHulKkUjLht3CPmn7egmhieNKo=
github.com/hashicorp/go-retryablehttp v0.6.7/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=

15
main.go
View File

@ -32,9 +32,11 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"github.com/fluxcd/pkg/recorder"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/pkg/runtime/metrics"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/controllers"
@ -88,9 +90,9 @@ func main() {
ctrl.SetLogger(logger.NewLogger(logLevel, logJSON))
var eventRecorder *recorder.EventRecorder
var eventRecorder *events.Recorder
if eventsAddr != "" {
if er, err := recorder.NewEventRecorder(eventsAddr, "source-controller"); err != nil {
if er, err := events.NewRecorder(eventsAddr, "source-controller"); err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
} else {
@ -98,6 +100,9 @@ func main() {
}
}
metricsRecorder := metrics.NewRecorder()
crtlmetrics.Registry.MustRegister(metricsRecorder.Collectors()...)
watchNamespace := ""
if !watchAllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
@ -128,6 +133,7 @@ func main() {
Storage: storage,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -142,6 +148,7 @@ func main() {
Getters: getters,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -156,6 +163,7 @@ func main() {
Getters: getters,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
@ -169,6 +177,7 @@ func main() {
Storage: storage,
EventRecorder: mgr.GetEventRecorderFor("source-controller"),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {