Split all inits into separate functions

Yay to readability.

Signed-off-by: Hidde Beydals <hidde@hhh.computer>
This commit is contained in:
Hidde Beydals 2023-03-28 21:03:40 +02:00
parent ed98913897
commit 747d6a335c
No known key found for this signature in database
GPG Key ID: 979F380FC2341744
1 changed files with 142 additions and 130 deletions

272
main.go
View File

@ -24,15 +24,14 @@ import (
"path/filepath"
"time"
"github.com/go-logr/logr"
flag "github.com/spf13/pflag"
"helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -159,102 +158,28 @@ func main() {
logger.SetLogger(logger.NewLogger(logOptions))
err := featureGates.WithLogger(setupLog).
SupportedFeatures(features.FeatureGates())
if err != nil {
if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil {
setupLog.Error(err, "unable to load feature gates")
os.Exit(1)
}
if artifactDigestAlgo != digest.Canonical.String() {
algo, err := digest.AlgorithmForName(artifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
digest.Canonical = algo
}
helm.MaxIndexSize = helmIndexLimit
helm.MaxChartSize = helmChartLimit
helm.MaxChartFileSize = helmChartFileLimit
watchNamespace := ""
if !watchOptions.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
var newSelectingCache ctrlcache.NewCacheFunc
watchSelector, err := helper.GetWatchSelector(watchOptions)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector")
os.Exit(1)
}
if watchSelector != labels.Everything() {
newSelectingCache = ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&v1.GitRepository{}: {Label: watchSelector},
&v1beta2.HelmRepository{}: {Label: watchSelector},
&v1beta2.HelmChart{}: {Label: watchSelector},
&v1beta2.Bucket{}: {Label: watchSelector},
&v1beta2.OCIRepository{}: {Label: watchSelector},
},
})
}
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps)
os.Exit(1)
}
if !shouldCache {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}
restConfig := client.GetConfigOrDie(clientOptions)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: healthAddr,
Port: 9443,
LeaderElection: leaderElectionOptions.Enable,
LeaderElectionReleaseOnCancel: leaderElectionOptions.ReleaseOnCancel,
LeaseDuration: &leaderElectionOptions.LeaseDuration,
RenewDeadline: &leaderElectionOptions.RenewDeadline,
RetryPeriod: &leaderElectionOptions.RetryPeriod,
LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName),
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: newSelectingCache,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
mgr := mustSetupManager(metricsAddr, healthAddr, watchOptions, clientOptions, leaderElectionOptions)
probes.SetupChecks(mgr, setupLog)
pprof.SetupHandlers(mgr, setupLog)
var eventRecorder *events.Recorder
if eventRecorder, err = events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName); err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
metrics := helper.MustMakeMetrics(mgr)
cacheRecorder := cache.MustMakeMetrics()
eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName)
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactDigestAlgo)
metricsH := helper.MustMakeMetrics(mgr)
mustSetupHelmLimits(helmIndexLimit, helmChartLimit, helmChartFileLimit)
helmIndexCache, helmIndexCacheItemTTL := mustInitHelmCache(helmCacheMaxSize, helmCacheTTL, helmCachePurgeInterval)
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
}
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
if err = (&controllers.GitRepositoryReconciler{
if err := (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
@ -266,10 +191,10 @@ func main() {
os.Exit(1)
}
if err = (&controllers.HelmRepositoryOCIReconciler{
if err := (&controllers.HelmRepositoryOCIReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Getters: getters,
ControllerName: controllerName,
RegistryClientGenerator: registry.ClientGenerator,
@ -281,35 +206,15 @@ func main() {
os.Exit(1)
}
var c *cache.Cache
var ttl time.Duration
if helmCacheMaxSize > 0 {
interval, err := time.ParseDuration(helmCachePurgeInterval)
if err != nil {
setupLog.Error(err, "unable to parse cache purge interval")
os.Exit(1)
}
ttl, err = time.ParseDuration(helmCacheTTL)
if err != nil {
setupLog.Error(err, "unable to parse cache TTL")
os.Exit(1)
}
c = cache.New(helmCacheMaxSize, interval)
}
cacheRecorder := cache.MustMakeMetrics()
if err = (&controllers.HelmRepositoryReconciler{
if err := (&controllers.HelmRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
Getters: getters,
ControllerName: controllerName,
Cache: c,
TTL: ttl,
Cache: helmIndexCache,
TTL: helmIndexCacheItemTTL,
CacheRecorder: cacheRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
@ -319,16 +224,16 @@ func main() {
os.Exit(1)
}
if err = (&controllers.HelmChartReconciler{
if err := (&controllers.HelmChartReconciler{
Client: mgr.GetClient(),
RegistryClientGenerator: registry.ClientGenerator,
Storage: storage,
Getters: getters,
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
ControllerName: controllerName,
Cache: c,
TTL: ttl,
Cache: helmIndexCache,
TTL: helmIndexCacheItemTTL,
CacheRecorder: cacheRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
@ -337,10 +242,11 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", v1beta2.HelmChartKind)
os.Exit(1)
}
if err = (&controllers.BucketReconciler{
if err := (&controllers.BucketReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
@ -350,12 +256,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Bucket")
os.Exit(1)
}
if err = (&controllers.OCIRepositoryReconciler{
if err := (&controllers.OCIRepositoryReconciler{
Client: mgr.GetClient(),
Storage: storage,
EventRecorder: eventRecorder,
ControllerName: controllerName,
Metrics: metricsH,
Metrics: metrics,
}).SetupWithManagerAndOptions(mgr, controllers.OCIRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
@ -371,7 +278,7 @@ func main() {
// to handle that.
<-mgr.Elected()
startFileServer(storage.BasePath, storageAddr, setupLog)
startFileServer(storage.BasePath, storageAddr)
}()
setupLog.Info("starting manager")
@ -381,37 +288,142 @@ func main() {
}
}
func startFileServer(path string, address string, l logr.Logger) {
l.Info("starting file server")
func startFileServer(path string, address string) {
setupLog.Info("starting file server")
fs := http.FileServer(http.Dir(path))
mux := http.NewServeMux()
mux.Handle("/", fs)
err := http.ListenAndServe(address, mux)
if err != nil {
l.Error(err, "file server error")
setupLog.Error(err, "file server error")
}
}
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage {
func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder {
eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName)
if err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
return eventRecorder
}
func mustSetupManager(metricsAddr, healthAddr string, watchOpts helper.WatchOptions, clientOpts client.Options, leaderOpts leaderelection.Options) ctrl.Manager {
watchNamespace := ""
if !watchOpts.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
watchSelector, err := helper.GetWatchSelector(watchOpts)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector for manager")
os.Exit(1)
}
newSelectingCache := ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&v1.GitRepository{}: {Label: watchSelector},
&v1beta2.HelmRepository{}: {Label: watchSelector},
&v1beta2.HelmChart{}: {Label: watchSelector},
&v1beta2.Bucket{}: {Label: watchSelector},
&v1beta2.OCIRepository{}: {Label: watchSelector},
},
})
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps)
os.Exit(1)
}
if !shouldCache {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}
restConfig := client.GetConfigOrDie(clientOpts)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: healthAddr,
Port: 9443,
LeaderElection: leaderOpts.Enable,
LeaderElectionReleaseOnCancel: leaderOpts.ReleaseOnCancel,
LeaseDuration: &leaderOpts.LeaseDuration,
RenewDeadline: &leaderOpts.RenewDeadline,
RetryPeriod: &leaderOpts.RetryPeriod,
LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName),
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: newSelectingCache,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
return mgr
}
func mustSetupHelmLimits(indexLimit, chartLimit, chartFileLimit int64) {
helm.MaxIndexSize = indexLimit
helm.MaxChartSize = chartLimit
helm.MaxChartFileSize = chartFileLimit
}
func mustInitHelmCache(maxSize int, purgeInterval, itemTTL string) (*cache.Cache, time.Duration) {
if maxSize <= 0 {
setupLog.Info("caching of Helm index files is disabled")
return nil, -1
}
interval, err := time.ParseDuration(purgeInterval)
if err != nil {
setupLog.Error(err, "unable to parse Helm index cache purge interval")
os.Exit(1)
}
ttl, err := time.ParseDuration(itemTTL)
if err != nil {
setupLog.Error(err, "unable to parse Helm index cache item TTL")
os.Exit(1)
}
return cache.New(maxSize, interval), ttl
}
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, artifactDigestAlgo string) *controllers.Storage {
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAdvAddr)
}
if artifactDigestAlgo != digest.Canonical.String() {
algo, err := digest.AlgorithmForName(artifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
digest.Canonical = algo
}
if path == "" {
p, _ := os.Getwd()
// TODO(hidde): look at this default path, seems to be an artifact of
// old things.
path = filepath.Join(p, "bin")
os.MkdirAll(path, 0o700)
}
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
if err != nil {
l.Error(err, "unable to initialise storage")
setupLog.Error(err, "unable to initialise storage")
os.Exit(1)
}
return storage
}
func determineAdvStorageAddr(storageAddr string, l logr.Logger) string {
func determineAdvStorageAddr(storageAddr string) string {
host, port, err := net.SplitHostPort(storageAddr)
if err != nil {
l.Error(err, "unable to parse storage address")
setupLog.Error(err, "unable to parse storage address")
os.Exit(1)
}
switch host {
@ -422,7 +434,7 @@ func determineAdvStorageAddr(storageAddr string, l logr.Logger) string {
if host == "" {
hn, err := os.Hostname()
if err != nil {
l.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid")
setupLog.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid")
os.Exit(1)
}
host = hn