From d5a75f6b2ffd29be3740f057f91f49080ec17aa2 Mon Sep 17 00:00:00 2001 From: York Chen Date: Thu, 28 Apr 2022 17:06:27 -0400 Subject: [PATCH] feat: cache helmrepo early after reconcile 1. moved chartRepo.Unload() from reconcileSource() to the defer func in reconcileArtifact to allow caching index in memory 2. added step to init memory cache in reconcileArtifact() 3. added step to save helmrepo index into memory cache in reconcileArtifact() Signed-off-by: York Chen --- controllers/helmrepository_controller.go | 28 ++++++++- controllers/helmrepository_controller_test.go | 58 +++++++++++++++++++ controllers/suite_test.go | 8 ++- .../helm/repository/chart_repository_test.go | 34 +++++++++++ main.go | 32 +++++----- 5 files changed, 143 insertions(+), 17 deletions(-) diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 6dff9e57..14bdbb24 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -46,6 +46,7 @@ import ( "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/helm/getter" "github.com/fluxcd/source-controller/internal/helm/repository" @@ -105,6 +106,10 @@ type HelmRepositoryReconciler struct { Getters helmgetter.Providers Storage *Storage ControllerName string + + Cache *cache.Cache + TTL time.Duration + *cache.CacheRecorder } type HelmRepositoryReconcilerOptions struct { @@ -451,7 +456,6 @@ func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sou conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - chartRepo.Unload() // Mark observations about the revision on the object. if !obj.GetArtifact().HasRevision(chartRepo.Checksum) { @@ -492,6 +496,8 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s "stored artifact for revision '%s'", artifact.Revision) } + chartRepo.Unload() + if err := chartRepo.RemoveCache(); err != nil { ctrl.LoggerFrom(ctx).Error(err, "failed to remove temporary cached index file") } @@ -545,6 +551,26 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s obj.Status.URL = indexURL } conditions.Delete(obj, sourcev1.StorageOperationFailedCondition) + + // enable cache if applicable + if r.Cache != nil && chartRepo.IndexCache == nil { + chartRepo.SetMemCache(r.Storage.LocalPath(*artifact), r.Cache, r.TTL, func(event string) { + r.IncCacheEvents(event, obj.GetName(), obj.GetNamespace()) + }) + } + + // Cache the index if it was successfully retrieved + // and the chart was successfully built + if r.Cache != nil && chartRepo.Index != nil { + // The cache key have to be safe in multi-tenancy environments, + // as otherwise it could be used as a vector to bypass the helm repository's authentication. + // Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format ///. + err := chartRepo.CacheIndexInMemory() + if err != nil { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) + } + } + return sreconcile.ResultSuccess, nil } diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 3ca34d6e..2e8df487 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -1299,3 +1299,61 @@ func TestHelmRepositoryReconciler_ReconcileSpecUpdatePredicateFilter(t *testing. return false }, timeout).Should(BeTrue()) } + +func TestHelmRepositoryReconciler_InMemoryCaching(t *testing.T) { + g := NewWithT(t) + testCache.Clear() + + testServer, err := helmtestserver.NewTempHelmServer() + g.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(testServer.Root()) + + g.Expect(testServer.PackageChartWithVersion("testdata/charts/helmchart", "0.1.0")).To(Succeed()) + g.Expect(testServer.GenerateIndex()).To(Succeed()) + + testServer.Start() + defer testServer.Stop() + + ns, err := testEnv.CreateNamespace(ctx, "helmrepository") + g.Expect(err).ToNot(HaveOccurred()) + defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }() + + helmRepo := &sourcev1.HelmRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "helmrepository-", + Namespace: ns.Name, + }, + Spec: sourcev1.HelmRepositorySpec{ + URL: testServer.URL(), + }, + } + g.Expect(testEnv.CreateAndWait(ctx, helmRepo)).To(Succeed()) + + key := client.ObjectKey{Name: helmRepo.Name, Namespace: helmRepo.Namespace} + // Wait for finalizer to be set + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, helmRepo); err != nil { + return false + } + return len(helmRepo.Finalizers) > 0 + }, timeout).Should(BeTrue()) + + // Wait for HelmRepository to be Ready + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, helmRepo); err != nil { + return false + } + if !conditions.IsReady(helmRepo) || helmRepo.Status.Artifact == nil { + return false + } + readyCondition := conditions.Get(helmRepo, meta.ReadyCondition) + return helmRepo.Generation == readyCondition.ObservedGeneration && + helmRepo.Generation == helmRepo.Status.ObservedGeneration + }, timeout).Should(BeTrue()) + + err = testEnv.Get(ctx, key, helmRepo) + g.Expect(err).ToNot(HaveOccurred()) + localPath := testStorage.LocalPath(*helmRepo.GetArtifact()) + _, cacheHit := testCache.Get(localPath) + g.Expect(cacheHit).To(BeTrue()) +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index b86a901b..5ab8c339 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -229,12 +229,18 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err)) } + testCache = cache.New(5, 1*time.Second) + cacheRecorder := cache.MustMakeMetrics() + if err := (&HelmRepositoryReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, + Cache: testCache, + TTL: 1 * time.Second, + CacheRecorder: cacheRecorder, }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } @@ -249,8 +255,6 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start HelmRepositoryOCIReconciler: %v", err)) } - testCache = cache.New(5, 1*time.Second) - cacheRecorder := cache.MustMakeMetrics() if err := (&HelmChartReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), diff --git a/internal/helm/repository/chart_repository_test.go b/internal/helm/repository/chart_repository_test.go index ef7f5c9c..4023345b 100644 --- a/internal/helm/repository/chart_repository_test.go +++ b/internal/helm/repository/chart_repository_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/fluxcd/source-controller/internal/cache" "github.com/fluxcd/source-controller/internal/helm" . "github.com/onsi/gomega" "helm.sh/helm/v3/pkg/chart" @@ -450,6 +451,39 @@ func TestChartRepository_StrategicallyLoadIndex(t *testing.T) { g.Expect(r.RemoveCache()).To(Succeed()) } +func TestChartRepository_CacheIndexInMemory(t *testing.T) { + g := NewWithT(t) + + interval, _ := time.ParseDuration("5s") + memCache := cache.New(1, interval) + indexPath := "/multi-tenent-safe/mock/index.yaml" + r := newChartRepository() + r.Index = repo.NewIndexFile() + indexFile := *r.Index + g.Expect( + indexFile.MustAdd( + &chart.Metadata{ + Name: "grafana", + Version: "6.17.4", + }, + "grafana-6.17.4.tgz", + "http://example.com/charts", + "sha256:1234567890abc", + )).To(Succeed()) + indexFile.WriteFile(indexPath, 0o640) + ttl, _ := time.ParseDuration("1m") + r.SetMemCache(indexPath, memCache, ttl, func(event string) { + fmt.Println(event) + }) + r.CacheIndexInMemory() + _, cacheHit := r.IndexCache.Get(indexPath) + g.Expect(cacheHit).To(Equal(true)) + r.Unload() + g.Expect(r.Index).To(BeNil()) + g.Expect(r.StrategicallyLoadIndex()).To(Succeed()) + g.Expect(r.Index.Entries["grafana"][0].Digest).To(Equal("sha256:1234567890abc")) +} + func TestChartRepository_LoadFromCache(t *testing.T) { tests := []struct { name string diff --git a/main.go b/main.go index 030ba335..0121fd62 100644 --- a/main.go +++ b/main.go @@ -224,20 +224,6 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", sourcev1.GitRepositoryKind) os.Exit(1) } - if err = (&controllers.HelmRepositoryReconciler{ - Client: mgr.GetClient(), - EventRecorder: eventRecorder, - Metrics: metricsH, - Storage: storage, - Getters: getters, - ControllerName: controllerName, - }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ - MaxConcurrentReconciles: concurrent, - RateLimiter: helper.GetRateLimiter(rateLimiterOptions), - }); err != nil { - setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind, "type", "default") - os.Exit(1) - } if err = (&controllers.HelmRepositoryOCIReconciler{ Client: mgr.GetClient(), @@ -274,6 +260,24 @@ func main() { cacheRecorder := cache.MustMakeMetrics() + if err = (&controllers.HelmRepositoryReconciler{ + Client: mgr.GetClient(), + EventRecorder: eventRecorder, + Metrics: metricsH, + Storage: storage, + Getters: getters, + ControllerName: controllerName, + Cache: c, + TTL: ttl, + CacheRecorder: cacheRecorder, + }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + RateLimiter: helper.GetRateLimiter(rateLimiterOptions), + }); err != nil { + setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind) + os.Exit(1) + } + if err = (&controllers.HelmChartReconciler{ Client: mgr.GetClient(), RegistryClientGenerator: registry.ClientGenerator,