diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 6dff9e57..14bdbb24 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -46,6 +46,7 @@ import ( "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/helm/getter" "github.com/fluxcd/source-controller/internal/helm/repository" @@ -105,6 +106,10 @@ type HelmRepositoryReconciler struct { Getters helmgetter.Providers Storage *Storage ControllerName string + + Cache *cache.Cache + TTL time.Duration + *cache.CacheRecorder } type HelmRepositoryReconcilerOptions struct { @@ -451,7 +456,6 @@ func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sou conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - chartRepo.Unload() // Mark observations about the revision on the object. if !obj.GetArtifact().HasRevision(chartRepo.Checksum) { @@ -492,6 +496,8 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s "stored artifact for revision '%s'", artifact.Revision) } + chartRepo.Unload() + if err := chartRepo.RemoveCache(); err != nil { ctrl.LoggerFrom(ctx).Error(err, "failed to remove temporary cached index file") } @@ -545,6 +551,26 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s obj.Status.URL = indexURL } conditions.Delete(obj, sourcev1.StorageOperationFailedCondition) + + // enable cache if applicable + if r.Cache != nil && chartRepo.IndexCache == nil { + chartRepo.SetMemCache(r.Storage.LocalPath(*artifact), r.Cache, r.TTL, func(event string) { + r.IncCacheEvents(event, obj.GetName(), obj.GetNamespace()) + }) + } + + // Cache the index if it was successfully retrieved + // and the chart was successfully built + if r.Cache != nil && chartRepo.Index != nil { + // The cache key have to be safe in multi-tenancy environments, + // as otherwise it could be used as a vector to bypass the helm repository's authentication. + // Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format ///. + err := chartRepo.CacheIndexInMemory() + if err != nil { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) + } + } + return sreconcile.ResultSuccess, nil } diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 3ca34d6e..2e8df487 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -1299,3 +1299,61 @@ func TestHelmRepositoryReconciler_ReconcileSpecUpdatePredicateFilter(t *testing. return false }, timeout).Should(BeTrue()) } + +func TestHelmRepositoryReconciler_InMemoryCaching(t *testing.T) { + g := NewWithT(t) + testCache.Clear() + + testServer, err := helmtestserver.NewTempHelmServer() + g.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(testServer.Root()) + + g.Expect(testServer.PackageChartWithVersion("testdata/charts/helmchart", "0.1.0")).To(Succeed()) + g.Expect(testServer.GenerateIndex()).To(Succeed()) + + testServer.Start() + defer testServer.Stop() + + ns, err := testEnv.CreateNamespace(ctx, "helmrepository") + g.Expect(err).ToNot(HaveOccurred()) + defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }() + + helmRepo := &sourcev1.HelmRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "helmrepository-", + Namespace: ns.Name, + }, + Spec: sourcev1.HelmRepositorySpec{ + URL: testServer.URL(), + }, + } + g.Expect(testEnv.CreateAndWait(ctx, helmRepo)).To(Succeed()) + + key := client.ObjectKey{Name: helmRepo.Name, Namespace: helmRepo.Namespace} + // Wait for finalizer to be set + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, helmRepo); err != nil { + return false + } + return len(helmRepo.Finalizers) > 0 + }, timeout).Should(BeTrue()) + + // Wait for HelmRepository to be Ready + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, helmRepo); err != nil { + return false + } + if !conditions.IsReady(helmRepo) || helmRepo.Status.Artifact == nil { + return false + } + readyCondition := conditions.Get(helmRepo, meta.ReadyCondition) + return helmRepo.Generation == readyCondition.ObservedGeneration && + helmRepo.Generation == helmRepo.Status.ObservedGeneration + }, timeout).Should(BeTrue()) + + err = testEnv.Get(ctx, key, helmRepo) + g.Expect(err).ToNot(HaveOccurred()) + localPath := testStorage.LocalPath(*helmRepo.GetArtifact()) + _, cacheHit := testCache.Get(localPath) + g.Expect(cacheHit).To(BeTrue()) +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index b86a901b..5ab8c339 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -229,12 +229,18 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err)) } + testCache = cache.New(5, 1*time.Second) + cacheRecorder := cache.MustMakeMetrics() + if err := (&HelmRepositoryReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, + Cache: testCache, + TTL: 1 * time.Second, + CacheRecorder: cacheRecorder, }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } @@ -249,8 +255,6 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start HelmRepositoryOCIReconciler: %v", err)) } - testCache = cache.New(5, 1*time.Second) - cacheRecorder := cache.MustMakeMetrics() if err := (&HelmChartReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), diff --git a/internal/helm/repository/chart_repository_test.go b/internal/helm/repository/chart_repository_test.go index ef7f5c9c..4023345b 100644 --- a/internal/helm/repository/chart_repository_test.go +++ b/internal/helm/repository/chart_repository_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/fluxcd/source-controller/internal/cache" "github.com/fluxcd/source-controller/internal/helm" . "github.com/onsi/gomega" "helm.sh/helm/v3/pkg/chart" @@ -450,6 +451,39 @@ func TestChartRepository_StrategicallyLoadIndex(t *testing.T) { g.Expect(r.RemoveCache()).To(Succeed()) } +func TestChartRepository_CacheIndexInMemory(t *testing.T) { + g := NewWithT(t) + + interval, _ := time.ParseDuration("5s") + memCache := cache.New(1, interval) + indexPath := "/multi-tenent-safe/mock/index.yaml" + r := newChartRepository() + r.Index = repo.NewIndexFile() + indexFile := *r.Index + g.Expect( + indexFile.MustAdd( + &chart.Metadata{ + Name: "grafana", + Version: "6.17.4", + }, + "grafana-6.17.4.tgz", + "http://example.com/charts", + "sha256:1234567890abc", + )).To(Succeed()) + indexFile.WriteFile(indexPath, 0o640) + ttl, _ := time.ParseDuration("1m") + r.SetMemCache(indexPath, memCache, ttl, func(event string) { + fmt.Println(event) + }) + r.CacheIndexInMemory() + _, cacheHit := r.IndexCache.Get(indexPath) + g.Expect(cacheHit).To(Equal(true)) + r.Unload() + g.Expect(r.Index).To(BeNil()) + g.Expect(r.StrategicallyLoadIndex()).To(Succeed()) + g.Expect(r.Index.Entries["grafana"][0].Digest).To(Equal("sha256:1234567890abc")) +} + func TestChartRepository_LoadFromCache(t *testing.T) { tests := []struct { name string diff --git a/main.go b/main.go index 030ba335..0121fd62 100644 --- a/main.go +++ b/main.go @@ -224,20 +224,6 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", sourcev1.GitRepositoryKind) os.Exit(1) } - if err = (&controllers.HelmRepositoryReconciler{ - Client: mgr.GetClient(), - EventRecorder: eventRecorder, - Metrics: metricsH, - Storage: storage, - Getters: getters, - ControllerName: controllerName, - }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ - MaxConcurrentReconciles: concurrent, - RateLimiter: helper.GetRateLimiter(rateLimiterOptions), - }); err != nil { - setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind, "type", "default") - os.Exit(1) - } if err = (&controllers.HelmRepositoryOCIReconciler{ Client: mgr.GetClient(), @@ -274,6 +260,24 @@ func main() { cacheRecorder := cache.MustMakeMetrics() + if err = (&controllers.HelmRepositoryReconciler{ + Client: mgr.GetClient(), + EventRecorder: eventRecorder, + Metrics: metricsH, + Storage: storage, + Getters: getters, + ControllerName: controllerName, + Cache: c, + TTL: ttl, + CacheRecorder: cacheRecorder, + }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + RateLimiter: helper.GetRateLimiter(rateLimiterOptions), + }); err != nil { + setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind) + os.Exit(1) + } + if err = (&controllers.HelmChartReconciler{ Client: mgr.GetClient(), RegistryClientGenerator: registry.ClientGenerator,