Add reconciler sharding capability based on label selector

Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
Stefan Prodan 2023-03-29 14:23:55 +03:00
parent 0ba76c01c3
commit 74cadb4d43
No known key found for this signature in database
GPG Key ID: 3299AEB0E4085BAF
2 changed files with 26 additions and 8 deletions

View File

@ -456,14 +456,14 @@ func TestKustomizationReconciler_FluxTransformers(t *testing.T) {
path: /metadata/labels/patch1
value: inline-json
`,
Target: kustomize.Selector{
Target: &kustomize.Selector{
LabelSelector: "app=podinfo",
},
},
{
Patch: `
apiVersion: v1
kind: Pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: podinfo
labels:

28
main.go
View File

@ -31,6 +31,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/runtime/acl"
@ -78,8 +79,8 @@ func main() {
logOptions logger.Options
leaderElectionOptions leaderelection.Options
rateLimiterOptions runtimeCtrl.RateLimiterOptions
watchOptions runtimeCtrl.WatchOptions
aclOptions acl.Options
watchAllNamespaces bool
noRemoteBases bool
httpRetry int
defaultServiceAccount string
@ -91,8 +92,6 @@ func main() {
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent kustomize reconciles.")
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.")
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.BoolVar(&noRemoteBases, "no-remote-bases", false,
"Disallow remote bases usage in Kustomize overlays. When this flag is enabled, all resources must refer to local files included in the source artifact.")
flag.IntVar(&httpRetry, "http-retry", 9, "The maximum number of retries when failing to fetch artifacts over HTTP.")
@ -105,6 +104,7 @@ func main() {
kubeConfigOpts.BindFlags(flag.CommandLine)
rateLimiterOptions.BindFlags(flag.CommandLine)
featureGates.BindFlags(flag.CommandLine)
watchOptions.BindFlags(flag.CommandLine)
flag.Parse()
@ -116,10 +116,16 @@ func main() {
}
watchNamespace := ""
if !watchAllNamespaces {
if !watchOptions.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
watchSelector, err := runtimeCtrl.GetWatchSelector(watchOptions)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector for manager")
os.Exit(1)
}
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
@ -130,6 +136,11 @@ func main() {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}
leaderElectionId := fmt.Sprintf("%s-%s", controllerName, "leader-election")
if watchOptions.LabelSelector != "" {
leaderElectionId = leaderelection.GenerateID(leaderElectionId, watchOptions.LabelSelector)
}
restConfig := runtimeClient.GetConfigOrDie(clientOptions)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
@ -141,10 +152,15 @@ func main() {
LeaseDuration: &leaderElectionOptions.LeaseDuration,
RenewDeadline: &leaderElectionOptions.RenewDeadline,
RetryPeriod: &leaderElectionOptions.RetryPeriod,
LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName),
LeaderElectionID: leaderElectionId,
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&kustomizev1.Kustomization{}: {Label: watchSelector},
},
}),
})
if err != nil {
setupLog.Error(err, "unable to start manager")
@ -166,9 +182,11 @@ func main() {
pollingOpts := polling.Options{
CustomStatusReaders: []engine.StatusReader{jobStatusReader},
}
if ok, _ := features.Enabled(features.DisableStatusPollerCache); ok {
pollingOpts.ClusterReaderFactory = engine.ClusterReaderFactoryFunc(clusterreader.NewDirectClusterReader)
}
if err = (&controllers.KustomizationReconciler{
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,