diff --git a/main.go b/main.go index 06ba2d07..e07bf51c 100644 --- a/main.go +++ b/main.go @@ -24,15 +24,14 @@ import ( "path/filepath" "time" - "github.com/go-logr/logr" flag "github.com/spf13/pflag" "helm.sh/helm/v3/pkg/getter" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -159,102 +158,28 @@ func main() { logger.SetLogger(logger.NewLogger(logOptions)) - err := featureGates.WithLogger(setupLog). - SupportedFeatures(features.FeatureGates()) - - if err != nil { + if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil { setupLog.Error(err, "unable to load feature gates") os.Exit(1) } - if artifactDigestAlgo != digest.Canonical.String() { - algo, err := digest.AlgorithmForName(artifactDigestAlgo) - if err != nil { - setupLog.Error(err, "unable to configure canonical digest algorithm") - os.Exit(1) - } - digest.Canonical = algo - } - - helm.MaxIndexSize = helmIndexLimit - helm.MaxChartSize = helmChartLimit - helm.MaxChartFileSize = helmChartFileLimit - - watchNamespace := "" - if !watchOptions.AllNamespaces { - watchNamespace = os.Getenv("RUNTIME_NAMESPACE") - } - - var newSelectingCache ctrlcache.NewCacheFunc - watchSelector, err := helper.GetWatchSelector(watchOptions) - if err != nil { - setupLog.Error(err, "unable to configure watch label selector") - os.Exit(1) - } - if watchSelector != labels.Everything() { - newSelectingCache = ctrlcache.BuilderWithOptions(ctrlcache.Options{ - SelectorsByObject: ctrlcache.SelectorsByObject{ - &v1.GitRepository{}: {Label: watchSelector}, - &v1beta2.HelmRepository{}: {Label: watchSelector}, - &v1beta2.HelmChart{}: {Label: watchSelector}, - &v1beta2.Bucket{}: {Label: watchSelector}, - &v1beta2.OCIRepository{}: {Label: watchSelector}, - }, - }) - } - - var disableCacheFor []ctrlclient.Object - shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps) - if err != nil { - setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps) - os.Exit(1) - } - if !shouldCache { - disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{}) - } - - restConfig := client.GetConfigOrDie(clientOptions) - mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ - Scheme: scheme, - MetricsBindAddress: metricsAddr, - HealthProbeBindAddress: healthAddr, - Port: 9443, - LeaderElection: leaderElectionOptions.Enable, - LeaderElectionReleaseOnCancel: leaderElectionOptions.ReleaseOnCancel, - LeaseDuration: &leaderElectionOptions.LeaseDuration, - RenewDeadline: &leaderElectionOptions.RenewDeadline, - RetryPeriod: &leaderElectionOptions.RetryPeriod, - LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName), - Namespace: watchNamespace, - Logger: ctrl.Log, - ClientDisableCacheFor: disableCacheFor, - NewCache: newSelectingCache, - }) - if err != nil { - setupLog.Error(err, "unable to start manager") - os.Exit(1) - } + mgr := mustSetupManager(metricsAddr, healthAddr, watchOptions, clientOptions, leaderElectionOptions) probes.SetupChecks(mgr, setupLog) pprof.SetupHandlers(mgr, setupLog) - var eventRecorder *events.Recorder - if eventRecorder, err = events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName); err != nil { - setupLog.Error(err, "unable to create event recorder") - os.Exit(1) - } + metrics := helper.MustMakeMetrics(mgr) + cacheRecorder := cache.MustMakeMetrics() + eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName) + storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactDigestAlgo) - metricsH := helper.MustMakeMetrics(mgr) + mustSetupHelmLimits(helmIndexLimit, helmChartLimit, helmChartFileLimit) + helmIndexCache, helmIndexCacheItemTTL := mustInitHelmCache(helmCacheMaxSize, helmCacheTTL, helmCachePurgeInterval) - if storageAdvAddr == "" { - storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog) - } - storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog) - - if err = (&controllers.GitRepositoryReconciler{ + if err := (&controllers.GitRepositoryReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, - Metrics: metricsH, + Metrics: metrics, Storage: storage, ControllerName: controllerName, }).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{ @@ -266,10 +191,10 @@ func main() { os.Exit(1) } - if err = (&controllers.HelmRepositoryOCIReconciler{ + if err := (&controllers.HelmRepositoryOCIReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, - Metrics: metricsH, + Metrics: metrics, Getters: getters, ControllerName: controllerName, RegistryClientGenerator: registry.ClientGenerator, @@ -281,35 +206,15 @@ func main() { os.Exit(1) } - var c *cache.Cache - var ttl time.Duration - if helmCacheMaxSize > 0 { - interval, err := time.ParseDuration(helmCachePurgeInterval) - if err != nil { - setupLog.Error(err, "unable to parse cache purge interval") - os.Exit(1) - } - - ttl, err = time.ParseDuration(helmCacheTTL) - if err != nil { - setupLog.Error(err, "unable to parse cache TTL") - os.Exit(1) - } - - c = cache.New(helmCacheMaxSize, interval) - } - - cacheRecorder := cache.MustMakeMetrics() - - if err = (&controllers.HelmRepositoryReconciler{ + if err := (&controllers.HelmRepositoryReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, - Metrics: metricsH, + Metrics: metrics, Storage: storage, Getters: getters, ControllerName: controllerName, - Cache: c, - TTL: ttl, + Cache: helmIndexCache, + TTL: helmIndexCacheItemTTL, CacheRecorder: cacheRecorder, }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ MaxConcurrentReconciles: concurrent, @@ -319,16 +224,16 @@ func main() { os.Exit(1) } - if err = (&controllers.HelmChartReconciler{ + if err := (&controllers.HelmChartReconciler{ Client: mgr.GetClient(), RegistryClientGenerator: registry.ClientGenerator, Storage: storage, Getters: getters, EventRecorder: eventRecorder, - Metrics: metricsH, + Metrics: metrics, ControllerName: controllerName, - Cache: c, - TTL: ttl, + Cache: helmIndexCache, + TTL: helmIndexCacheItemTTL, CacheRecorder: cacheRecorder, }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ MaxConcurrentReconciles: concurrent, @@ -337,10 +242,11 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", v1beta2.HelmChartKind) os.Exit(1) } - if err = (&controllers.BucketReconciler{ + + if err := (&controllers.BucketReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, - Metrics: metricsH, + Metrics: metrics, Storage: storage, ControllerName: controllerName, }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ @@ -350,12 +256,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Bucket") os.Exit(1) } - if err = (&controllers.OCIRepositoryReconciler{ + + if err := (&controllers.OCIRepositoryReconciler{ Client: mgr.GetClient(), Storage: storage, EventRecorder: eventRecorder, ControllerName: controllerName, - Metrics: metricsH, + Metrics: metrics, }).SetupWithManagerAndOptions(mgr, controllers.OCIRepositoryReconcilerOptions{ MaxConcurrentReconciles: concurrent, RateLimiter: helper.GetRateLimiter(rateLimiterOptions), @@ -371,7 +278,7 @@ func main() { // to handle that. <-mgr.Elected() - startFileServer(storage.BasePath, storageAddr, setupLog) + startFileServer(storage.BasePath, storageAddr) }() setupLog.Info("starting manager") @@ -381,37 +288,142 @@ func main() { } } -func startFileServer(path string, address string, l logr.Logger) { - l.Info("starting file server") +func startFileServer(path string, address string) { + setupLog.Info("starting file server") fs := http.FileServer(http.Dir(path)) mux := http.NewServeMux() mux.Handle("/", fs) err := http.ListenAndServe(address, mux) if err != nil { - l.Error(err, "file server error") + setupLog.Error(err, "file server error") } } -func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage { +func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder { + eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName) + if err != nil { + setupLog.Error(err, "unable to create event recorder") + os.Exit(1) + } + return eventRecorder +} + +func mustSetupManager(metricsAddr, healthAddr string, watchOpts helper.WatchOptions, clientOpts client.Options, leaderOpts leaderelection.Options) ctrl.Manager { + watchNamespace := "" + if !watchOpts.AllNamespaces { + watchNamespace = os.Getenv("RUNTIME_NAMESPACE") + } + + watchSelector, err := helper.GetWatchSelector(watchOpts) + if err != nil { + setupLog.Error(err, "unable to configure watch label selector for manager") + os.Exit(1) + } + newSelectingCache := ctrlcache.BuilderWithOptions(ctrlcache.Options{ + SelectorsByObject: ctrlcache.SelectorsByObject{ + &v1.GitRepository{}: {Label: watchSelector}, + &v1beta2.HelmRepository{}: {Label: watchSelector}, + &v1beta2.HelmChart{}: {Label: watchSelector}, + &v1beta2.Bucket{}: {Label: watchSelector}, + &v1beta2.OCIRepository{}: {Label: watchSelector}, + }, + }) + + var disableCacheFor []ctrlclient.Object + shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps) + if err != nil { + setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps) + os.Exit(1) + } + if !shouldCache { + disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{}) + } + + restConfig := client.GetConfigOrDie(clientOpts) + mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + MetricsBindAddress: metricsAddr, + HealthProbeBindAddress: healthAddr, + Port: 9443, + LeaderElection: leaderOpts.Enable, + LeaderElectionReleaseOnCancel: leaderOpts.ReleaseOnCancel, + LeaseDuration: &leaderOpts.LeaseDuration, + RenewDeadline: &leaderOpts.RenewDeadline, + RetryPeriod: &leaderOpts.RetryPeriod, + LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName), + Namespace: watchNamespace, + Logger: ctrl.Log, + ClientDisableCacheFor: disableCacheFor, + NewCache: newSelectingCache, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + return mgr +} + +func mustSetupHelmLimits(indexLimit, chartLimit, chartFileLimit int64) { + helm.MaxIndexSize = indexLimit + helm.MaxChartSize = chartLimit + helm.MaxChartFileSize = chartFileLimit +} + +func mustInitHelmCache(maxSize int, purgeInterval, itemTTL string) (*cache.Cache, time.Duration) { + if maxSize <= 0 { + setupLog.Info("caching of Helm index files is disabled") + return nil, -1 + } + + interval, err := time.ParseDuration(purgeInterval) + if err != nil { + setupLog.Error(err, "unable to parse Helm index cache purge interval") + os.Exit(1) + } + + ttl, err := time.ParseDuration(itemTTL) + if err != nil { + setupLog.Error(err, "unable to parse Helm index cache item TTL") + os.Exit(1) + } + + return cache.New(maxSize, interval), ttl +} + +func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, artifactDigestAlgo string) *controllers.Storage { + if storageAdvAddr == "" { + storageAdvAddr = determineAdvStorageAddr(storageAdvAddr) + } + + if artifactDigestAlgo != digest.Canonical.String() { + algo, err := digest.AlgorithmForName(artifactDigestAlgo) + if err != nil { + setupLog.Error(err, "unable to configure canonical digest algorithm") + os.Exit(1) + } + digest.Canonical = algo + } + if path == "" { p, _ := os.Getwd() + // TODO(hidde): look at this default path, seems to be an artifact of + // old things. path = filepath.Join(p, "bin") os.MkdirAll(path, 0o700) } storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords) if err != nil { - l.Error(err, "unable to initialise storage") + setupLog.Error(err, "unable to initialise storage") os.Exit(1) } - return storage } -func determineAdvStorageAddr(storageAddr string, l logr.Logger) string { +func determineAdvStorageAddr(storageAddr string) string { host, port, err := net.SplitHostPort(storageAddr) if err != nil { - l.Error(err, "unable to parse storage address") + setupLog.Error(err, "unable to parse storage address") os.Exit(1) } switch host { @@ -422,7 +434,7 @@ func determineAdvStorageAddr(storageAddr string, l logr.Logger) string { if host == "" { hn, err := os.Hostname() if err != nil { - l.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid") + setupLog.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid") os.Exit(1) } host = hn