controllers: make concurrent reconciles config opt

Introduces new helpers and config structs to all reconcilers to
set the max concurrent reconciles number.

Introduces a new flag `--concurrent` to configure the number of
concurrent reconciles per reconciler, defaults to `2`.
This commit is contained in:
Hidde Beydals 2020-04-19 11:44:28 +02:00
parent 2f1390dda3
commit 5b77100589
4 changed files with 48 additions and 7 deletions

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
intgit "github.com/fluxcd/source-controller/internal/git"
@ -95,11 +96,20 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
return ctrl.Result{RequeueAfter: repo.GetInterval().Duration}, nil
}
type GitRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
}
func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, GitRepositoryReconcilerOptions{})
}
func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts GitRepositoryReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.GitRepository{}).
WithEventFilter(SourceChangePredicate{}).
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/yaml"
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
@ -114,11 +115,20 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{RequeueAfter: chart.GetInterval().Duration}, nil
}
type HelmChartReconcilerOptions struct {
MaxConcurrentReconciles int
}
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, HelmChartReconcilerOptions{})
}
func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmChartReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.HelmChart{}).
WithEventFilter(SourceChangePredicate{}).
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/yaml"
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
@ -99,11 +100,20 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
}
type HelmRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
}
func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, HelmRepositoryReconcilerOptions{})
}
func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmRepositoryReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.HelmRepository{}).
WithEventFilter(SourceChangePredicate{}).
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}

25
main.go
View File

@ -57,16 +57,21 @@ func init() {
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var storagePath string
var storageAddr string
var (
metricsAddr string
enableLeaderElection bool
storagePath string
storageAddr string
concurrent int
)
flag.StringVar(&metricsAddr, "metrics-addr", ":9090", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&storagePath, "storage-path", "", "The local storage path.")
flag.StringVar(&storageAddr, "storage-addr", ":8080", "The address the static file server binds to.")
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
flag.Parse()
@ -93,7 +98,9 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
Scheme: mgr.GetScheme(),
Storage: storage,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "GitRepository")
os.Exit(1)
}
@ -103,7 +110,9 @@ func main() {
Scheme: mgr.GetScheme(),
Storage: storage,
Getters: getters,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HelmRepository")
os.Exit(1)
}
@ -113,7 +122,9 @@ func main() {
Scheme: mgr.GetScheme(),
Storage: storage,
Getters: getters,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HelmChart")
os.Exit(1)
}