Add reconciler sharding capability based on label

With this enhancement, the controller can be configured with
`--watch-label-selector`, after which only objects with this label will
be reconciled by the controller.

This allows for horizontal scaling of the source-controller, where each
controller can be deployed multiple times with a unique label selector
which is used as the sharding key.

Note that this also requires configuration of the `--storage-adv-addr`
to a unique address (in combination with a proper Service definition).
This to ensure the Artifacts handled by the sharding controller point
to a unique endpoint.

In addition, Source object kinds which have a dependency on another
kind (i.e. a HelmChart on a HelmRepository) need to have the same
labels applied to work as expected.

Signed-off-by: Hidde Beydals <hidde@hhh.computer>
This commit is contained in:
Hidde Beydals 2023-03-28 15:08:06 +02:00
parent 51dea22347
commit ed98913897
No known key found for this signature in database
GPG Key ID: 979F380FC2341744
3 changed files with 33 additions and 13 deletions

2
go.mod
View File

@ -29,7 +29,7 @@ require (
github.com/fluxcd/pkg/lockedfile v0.1.0
github.com/fluxcd/pkg/masktoken v0.2.0
github.com/fluxcd/pkg/oci v0.21.1
github.com/fluxcd/pkg/runtime v0.33.0
github.com/fluxcd/pkg/runtime v0.34.0
github.com/fluxcd/pkg/sourceignore v0.3.3
github.com/fluxcd/pkg/ssh v0.7.3
github.com/fluxcd/pkg/testserver v0.4.0

4
go.sum
View File

@ -550,8 +550,8 @@ github.com/fluxcd/pkg/masktoken v0.2.0 h1:HoSPTk4l1fz5Fevs2vVRvZGru33blfMwWSZKsH
github.com/fluxcd/pkg/masktoken v0.2.0/go.mod h1:EA7GleAHL33kN6kTW06m5R3/Q26IyuGO7Ef/0CtpDI0=
github.com/fluxcd/pkg/oci v0.21.1 h1:9kn19wkabE2xB77NRlOtMJlSYhZmUjdloZCzlHdAS6s=
github.com/fluxcd/pkg/oci v0.21.1/go.mod h1:9E2DBlQII7YmeWt2ieTh38wwkiBqx3yg5NEJ51uefaA=
github.com/fluxcd/pkg/runtime v0.33.0 h1:y6mFOj22mU/BXAxSTucTlT7vrWUjd0+iccK0pRN5CF0=
github.com/fluxcd/pkg/runtime v0.33.0/go.mod h1:oDTerqMMtOQVNZeidwAPG7g/ai2xuidUduJzQh1IBVI=
github.com/fluxcd/pkg/runtime v0.34.0 h1:vnwsCZcJtD9iE7K8d4rpE6YSYFWDrFOdA85Poagyp8s=
github.com/fluxcd/pkg/runtime v0.34.0/go.mod h1:oDTerqMMtOQVNZeidwAPG7g/ai2xuidUduJzQh1IBVI=
github.com/fluxcd/pkg/sourceignore v0.3.3 h1:Ue29JAuPECEYdvIqdpXpQaDxpeySn7amarLArp7XoIs=
github.com/fluxcd/pkg/sourceignore v0.3.3/go.mod h1:yuJzKggph0Bdbk9LgXjJQhvJZSTJV/1vS7mJuB7mPa0=
github.com/fluxcd/pkg/ssh v0.7.3 h1:Dhs+nXdp806lBriUJtPyRi0SVIVWbJafJGD/qQ71GiY=

40
main.go
View File

@ -28,11 +28,13 @@ import (
flag "github.com/spf13/pflag"
"helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/git"
@ -45,16 +47,16 @@ import (
"github.com/fluxcd/pkg/runtime/pprof"
"github.com/fluxcd/pkg/runtime/probes"
"github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm/registry"
"github.com/fluxcd/source-controller/api/v1"
"github.com/fluxcd/source-controller/api/v1beta2"
// +kubebuilder:scaffold:imports
v1 "github.com/fluxcd/source-controller/api/v1"
v1beta2 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/controllers"
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm"
// +kubebuilder:scaffold:imports
"github.com/fluxcd/source-controller/internal/helm/registry"
)
const controllerName = "source-controller"
@ -92,7 +94,6 @@ func main() {
storageAdvAddr string
concurrent int
requeueDependency time.Duration
watchAllNamespaces bool
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
@ -101,6 +102,7 @@ func main() {
leaderElectionOptions leaderelection.Options
rateLimiterOptions helper.RateLimiterOptions
featureGates feathelper.FeatureGates
watchOptions helper.WatchOptions
helmCacheMaxSize int
helmCacheTTL string
helmCachePurgeInterval string
@ -121,8 +123,6 @@ func main() {
flag.StringVar(&storageAdvAddr, "storage-adv-addr", envOrDefault("STORAGE_ADV_ADDR", ""),
"The advertised address of the static file server.")
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.Int64Var(&helmIndexLimit, "helm-index-max-size", helm.MaxIndexSize,
"The max allowed size in bytes of a Helm repository index file.")
flag.Int64Var(&helmChartLimit, "helm-chart-max-size", helm.MaxChartSize,
@ -153,6 +153,7 @@ func main() {
leaderElectionOptions.BindFlags(flag.CommandLine)
rateLimiterOptions.BindFlags(flag.CommandLine)
featureGates.BindFlags(flag.CommandLine)
watchOptions.BindFlags(flag.CommandLine)
flag.Parse()
@ -180,10 +181,28 @@ func main() {
helm.MaxChartFileSize = helmChartFileLimit
watchNamespace := ""
if !watchAllNamespaces {
if !watchOptions.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
var newSelectingCache ctrlcache.NewCacheFunc
watchSelector, err := helper.GetWatchSelector(watchOptions)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector")
os.Exit(1)
}
if watchSelector != labels.Everything() {
newSelectingCache = ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&v1.GitRepository{}: {Label: watchSelector},
&v1beta2.HelmRepository{}: {Label: watchSelector},
&v1beta2.HelmChart{}: {Label: watchSelector},
&v1beta2.Bucket{}: {Label: watchSelector},
&v1beta2.OCIRepository{}: {Label: watchSelector},
},
})
}
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
@ -209,6 +228,7 @@ func main() {
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: newSelectingCache,
})
if err != nil {
setupLog.Error(err, "unable to start manager")