Merge pull request #30 from fluxcd/concurrent-reconciles
controllers: make concurrent reconciles config opt
This commit is contained in:
commit
d77ae99678
|
@ -34,6 +34,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||||
|
|
||||||
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
||||||
intgit "github.com/fluxcd/source-controller/internal/git"
|
intgit "github.com/fluxcd/source-controller/internal/git"
|
||||||
|
@ -95,11 +96,20 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
|
||||||
return ctrl.Result{RequeueAfter: repo.GetInterval().Duration}, nil
|
return ctrl.Result{RequeueAfter: repo.GetInterval().Duration}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GitRepositoryReconcilerOptions struct {
|
||||||
|
MaxConcurrentReconciles int
|
||||||
|
}
|
||||||
|
|
||||||
func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
|
return r.SetupWithManagerAndOptions(mgr, GitRepositoryReconcilerOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts GitRepositoryReconcilerOptions) error {
|
||||||
return ctrl.NewControllerManagedBy(mgr).
|
return ctrl.NewControllerManagedBy(mgr).
|
||||||
For(&sourcev1.GitRepository{}).
|
For(&sourcev1.GitRepository{}).
|
||||||
WithEventFilter(SourceChangePredicate{}).
|
WithEventFilter(SourceChangePredicate{}).
|
||||||
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
||||||
|
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
|
||||||
Complete(r)
|
Complete(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
|
|
||||||
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
||||||
|
@ -114,11 +115,20 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||||
return ctrl.Result{RequeueAfter: chart.GetInterval().Duration}, nil
|
return ctrl.Result{RequeueAfter: chart.GetInterval().Duration}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HelmChartReconcilerOptions struct {
|
||||||
|
MaxConcurrentReconciles int
|
||||||
|
}
|
||||||
|
|
||||||
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
|
return r.SetupWithManagerAndOptions(mgr, HelmChartReconcilerOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmChartReconcilerOptions) error {
|
||||||
return ctrl.NewControllerManagedBy(mgr).
|
return ctrl.NewControllerManagedBy(mgr).
|
||||||
For(&sourcev1.HelmChart{}).
|
For(&sourcev1.HelmChart{}).
|
||||||
WithEventFilter(SourceChangePredicate{}).
|
WithEventFilter(SourceChangePredicate{}).
|
||||||
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
||||||
|
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
|
||||||
Complete(r)
|
Complete(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
|
|
||||||
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
||||||
|
@ -99,11 +100,20 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
|
||||||
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
|
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HelmRepositoryReconcilerOptions struct {
|
||||||
|
MaxConcurrentReconciles int
|
||||||
|
}
|
||||||
|
|
||||||
func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
|
return r.SetupWithManagerAndOptions(mgr, HelmRepositoryReconcilerOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmRepositoryReconcilerOptions) error {
|
||||||
return ctrl.NewControllerManagedBy(mgr).
|
return ctrl.NewControllerManagedBy(mgr).
|
||||||
For(&sourcev1.HelmRepository{}).
|
For(&sourcev1.HelmRepository{}).
|
||||||
WithEventFilter(SourceChangePredicate{}).
|
WithEventFilter(SourceChangePredicate{}).
|
||||||
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
WithEventFilter(GarbageCollectPredicate{Scheme: r.Scheme, Log: r.Log, Storage: r.Storage}).
|
||||||
|
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
|
||||||
Complete(r)
|
Complete(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
25
main.go
25
main.go
|
@ -57,16 +57,21 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var metricsAddr string
|
var (
|
||||||
var enableLeaderElection bool
|
metricsAddr string
|
||||||
var storagePath string
|
enableLeaderElection bool
|
||||||
var storageAddr string
|
storagePath string
|
||||||
|
storageAddr string
|
||||||
|
concurrent int
|
||||||
|
)
|
||||||
|
|
||||||
flag.StringVar(&metricsAddr, "metrics-addr", ":9090", "The address the metric endpoint binds to.")
|
flag.StringVar(&metricsAddr, "metrics-addr", ":9090", "The address the metric endpoint binds to.")
|
||||||
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
|
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
|
||||||
"Enable leader election for controller manager. "+
|
"Enable leader election for controller manager. "+
|
||||||
"Enabling this will ensure there is only one active controller manager.")
|
"Enabling this will ensure there is only one active controller manager.")
|
||||||
flag.StringVar(&storagePath, "storage-path", "", "The local storage path.")
|
flag.StringVar(&storagePath, "storage-path", "", "The local storage path.")
|
||||||
flag.StringVar(&storageAddr, "storage-addr", ":8080", "The address the static file server binds to.")
|
flag.StringVar(&storageAddr, "storage-addr", ":8080", "The address the static file server binds to.")
|
||||||
|
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
@ -93,7 +98,9 @@ func main() {
|
||||||
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
|
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
|
||||||
Scheme: mgr.GetScheme(),
|
Scheme: mgr.GetScheme(),
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
}).SetupWithManager(mgr); err != nil {
|
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
|
||||||
|
MaxConcurrentReconciles: concurrent,
|
||||||
|
}); err != nil {
|
||||||
setupLog.Error(err, "unable to create controller", "controller", "GitRepository")
|
setupLog.Error(err, "unable to create controller", "controller", "GitRepository")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
@ -103,7 +110,9 @@ func main() {
|
||||||
Scheme: mgr.GetScheme(),
|
Scheme: mgr.GetScheme(),
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
Getters: getters,
|
Getters: getters,
|
||||||
}).SetupWithManager(mgr); err != nil {
|
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
|
||||||
|
MaxConcurrentReconciles: concurrent,
|
||||||
|
}); err != nil {
|
||||||
setupLog.Error(err, "unable to create controller", "controller", "HelmRepository")
|
setupLog.Error(err, "unable to create controller", "controller", "HelmRepository")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
@ -113,7 +122,9 @@ func main() {
|
||||||
Scheme: mgr.GetScheme(),
|
Scheme: mgr.GetScheme(),
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
Getters: getters,
|
Getters: getters,
|
||||||
}).SetupWithManager(mgr); err != nil {
|
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
|
||||||
|
MaxConcurrentReconciles: concurrent,
|
||||||
|
}); err != nil {
|
||||||
setupLog.Error(err, "unable to create controller", "controller", "HelmChart")
|
setupLog.Error(err, "unable to create controller", "controller", "HelmChart")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue