diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index b76fb1b1..2f968c8a 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1" intgit "github.com/fluxcd/source-controller/internal/git" @@ -95,11 +96,20 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro return ctrl.Result{RequeueAfter: repo.GetInterval().Duration}, nil } +type GitRepositoryReconcilerOptions struct { + MaxConcurrentReconciles int +} + func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { + return r.SetupWithManagerAndOptions(mgr, GitRepositoryReconcilerOptions{}) +} + +func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts GitRepositoryReconcilerOptions) error { return ctrl.NewControllerManagedBy(mgr). For(&sourcev1.GitRepository{}). WithEventFilter(SourceChangePredicate{}). WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}). + WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}). Complete(r) } diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index b726ff36..1ab4114d 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/yaml" sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1" @@ -114,11 +115,20 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{RequeueAfter: chart.GetInterval().Duration}, nil } +type HelmChartReconcilerOptions struct { + MaxConcurrentReconciles int +} + func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error { + return r.SetupWithManagerAndOptions(mgr, HelmChartReconcilerOptions{}) +} + +func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmChartReconcilerOptions) error { return ctrl.NewControllerManagedBy(mgr). For(&sourcev1.HelmChart{}). WithEventFilter(SourceChangePredicate{}). WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}). + WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}). Complete(r) } diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 15f24ec2..3b097438 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/yaml" sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1" @@ -99,11 +100,20 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil } +type HelmRepositoryReconcilerOptions struct { + MaxConcurrentReconciles int +} + func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { + return r.SetupWithManagerAndOptions(mgr, HelmRepositoryReconcilerOptions{}) +} + +func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmRepositoryReconcilerOptions) error { return ctrl.NewControllerManagedBy(mgr). For(&sourcev1.HelmRepository{}). WithEventFilter(SourceChangePredicate{}). WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}). + WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}). Complete(r) } diff --git a/main.go b/main.go index 51fd6e85..00372d16 100644 --- a/main.go +++ b/main.go @@ -57,16 +57,21 @@ func init() { } func main() { - var metricsAddr string - var enableLeaderElection bool - var storagePath string - var storageAddr string + var ( + metricsAddr string + enableLeaderElection bool + storagePath string + storageAddr string + concurrent int + ) + flag.StringVar(&metricsAddr, "metrics-addr", ":9090", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") flag.StringVar(&storagePath, "storage-path", "", "The local storage path.") flag.StringVar(&storageAddr, "storage-addr", ":8080", "The address the static file server binds to.") + flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.") flag.Parse() @@ -93,7 +98,9 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("GitRepository"), Scheme: mgr.GetScheme(), Storage: storage, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "GitRepository") os.Exit(1) } @@ -103,7 +110,9 @@ func main() { Scheme: mgr.GetScheme(), Storage: storage, Getters: getters, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HelmRepository") os.Exit(1) } @@ -113,7 +122,9 @@ func main() { Scheme: mgr.GetScheme(), Storage: storage, Getters: getters, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HelmChart") os.Exit(1) }