feat: cache helmrepo early after reconcile
1. moved chartRepo.Unload() from reconcileSource() to the defer func in reconcileArtifact to allow caching index in memory 2. added step to init memory cache in reconcileArtifact() 3. added step to save helmrepo index into memory cache in reconcileArtifact() Signed-off-by: York Chen <ychen@d2iq.com>
This commit is contained in:
parent
a6072c3301
commit
d5a75f6b2f
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
"github.com/fluxcd/source-controller/internal/cache"
|
||||
serror "github.com/fluxcd/source-controller/internal/error"
|
||||
"github.com/fluxcd/source-controller/internal/helm/getter"
|
||||
"github.com/fluxcd/source-controller/internal/helm/repository"
|
||||
|
@ -105,6 +106,10 @@ type HelmRepositoryReconciler struct {
|
|||
Getters helmgetter.Providers
|
||||
Storage *Storage
|
||||
ControllerName string
|
||||
|
||||
Cache *cache.Cache
|
||||
TTL time.Duration
|
||||
*cache.CacheRecorder
|
||||
}
|
||||
|
||||
type HelmRepositoryReconcilerOptions struct {
|
||||
|
@ -451,7 +456,6 @@ func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sou
|
|||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
chartRepo.Unload()
|
||||
|
||||
// Mark observations about the revision on the object.
|
||||
if !obj.GetArtifact().HasRevision(chartRepo.Checksum) {
|
||||
|
@ -492,6 +496,8 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
|
|||
"stored artifact for revision '%s'", artifact.Revision)
|
||||
}
|
||||
|
||||
chartRepo.Unload()
|
||||
|
||||
if err := chartRepo.RemoveCache(); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to remove temporary cached index file")
|
||||
}
|
||||
|
@ -545,6 +551,26 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
|
|||
obj.Status.URL = indexURL
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.StorageOperationFailedCondition)
|
||||
|
||||
// enable cache if applicable
|
||||
if r.Cache != nil && chartRepo.IndexCache == nil {
|
||||
chartRepo.SetMemCache(r.Storage.LocalPath(*artifact), r.Cache, r.TTL, func(event string) {
|
||||
r.IncCacheEvents(event, obj.GetName(), obj.GetNamespace())
|
||||
})
|
||||
}
|
||||
|
||||
// Cache the index if it was successfully retrieved
|
||||
// and the chart was successfully built
|
||||
if r.Cache != nil && chartRepo.Index != nil {
|
||||
// The cache key have to be safe in multi-tenancy environments,
|
||||
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
|
||||
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
|
||||
err := chartRepo.CacheIndexInMemory()
|
||||
if err != nil {
|
||||
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1299,3 +1299,61 @@ func TestHelmRepositoryReconciler_ReconcileSpecUpdatePredicateFilter(t *testing.
|
|||
return false
|
||||
}, timeout).Should(BeTrue())
|
||||
}
|
||||
|
||||
func TestHelmRepositoryReconciler_InMemoryCaching(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
testCache.Clear()
|
||||
|
||||
testServer, err := helmtestserver.NewTempHelmServer()
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
defer os.RemoveAll(testServer.Root())
|
||||
|
||||
g.Expect(testServer.PackageChartWithVersion("testdata/charts/helmchart", "0.1.0")).To(Succeed())
|
||||
g.Expect(testServer.GenerateIndex()).To(Succeed())
|
||||
|
||||
testServer.Start()
|
||||
defer testServer.Stop()
|
||||
|
||||
ns, err := testEnv.CreateNamespace(ctx, "helmrepository")
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }()
|
||||
|
||||
helmRepo := &sourcev1.HelmRepository{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "helmrepository-",
|
||||
Namespace: ns.Name,
|
||||
},
|
||||
Spec: sourcev1.HelmRepositorySpec{
|
||||
URL: testServer.URL(),
|
||||
},
|
||||
}
|
||||
g.Expect(testEnv.CreateAndWait(ctx, helmRepo)).To(Succeed())
|
||||
|
||||
key := client.ObjectKey{Name: helmRepo.Name, Namespace: helmRepo.Namespace}
|
||||
// Wait for finalizer to be set
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, helmRepo); err != nil {
|
||||
return false
|
||||
}
|
||||
return len(helmRepo.Finalizers) > 0
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
// Wait for HelmRepository to be Ready
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, helmRepo); err != nil {
|
||||
return false
|
||||
}
|
||||
if !conditions.IsReady(helmRepo) || helmRepo.Status.Artifact == nil {
|
||||
return false
|
||||
}
|
||||
readyCondition := conditions.Get(helmRepo, meta.ReadyCondition)
|
||||
return helmRepo.Generation == readyCondition.ObservedGeneration &&
|
||||
helmRepo.Generation == helmRepo.Status.ObservedGeneration
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
err = testEnv.Get(ctx, key, helmRepo)
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
localPath := testStorage.LocalPath(*helmRepo.GetArtifact())
|
||||
_, cacheHit := testCache.Get(localPath)
|
||||
g.Expect(cacheHit).To(BeTrue())
|
||||
}
|
||||
|
|
|
@ -229,12 +229,18 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
|
||||
}
|
||||
|
||||
testCache = cache.New(5, 1*time.Second)
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
|
||||
if err := (&HelmRepositoryReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
Metrics: testMetricsH,
|
||||
Getters: testGetters,
|
||||
Storage: testStorage,
|
||||
Cache: testCache,
|
||||
TTL: 1 * time.Second,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManager(testEnv); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
@ -249,8 +255,6 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start HelmRepositoryOCIReconciler: %v", err))
|
||||
}
|
||||
|
||||
testCache = cache.New(5, 1*time.Second)
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
if err := (&HelmChartReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/source-controller/internal/cache"
|
||||
"github.com/fluxcd/source-controller/internal/helm"
|
||||
. "github.com/onsi/gomega"
|
||||
"helm.sh/helm/v3/pkg/chart"
|
||||
|
@ -450,6 +451,39 @@ func TestChartRepository_StrategicallyLoadIndex(t *testing.T) {
|
|||
g.Expect(r.RemoveCache()).To(Succeed())
|
||||
}
|
||||
|
||||
func TestChartRepository_CacheIndexInMemory(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
interval, _ := time.ParseDuration("5s")
|
||||
memCache := cache.New(1, interval)
|
||||
indexPath := "/multi-tenent-safe/mock/index.yaml"
|
||||
r := newChartRepository()
|
||||
r.Index = repo.NewIndexFile()
|
||||
indexFile := *r.Index
|
||||
g.Expect(
|
||||
indexFile.MustAdd(
|
||||
&chart.Metadata{
|
||||
Name: "grafana",
|
||||
Version: "6.17.4",
|
||||
},
|
||||
"grafana-6.17.4.tgz",
|
||||
"http://example.com/charts",
|
||||
"sha256:1234567890abc",
|
||||
)).To(Succeed())
|
||||
indexFile.WriteFile(indexPath, 0o640)
|
||||
ttl, _ := time.ParseDuration("1m")
|
||||
r.SetMemCache(indexPath, memCache, ttl, func(event string) {
|
||||
fmt.Println(event)
|
||||
})
|
||||
r.CacheIndexInMemory()
|
||||
_, cacheHit := r.IndexCache.Get(indexPath)
|
||||
g.Expect(cacheHit).To(Equal(true))
|
||||
r.Unload()
|
||||
g.Expect(r.Index).To(BeNil())
|
||||
g.Expect(r.StrategicallyLoadIndex()).To(Succeed())
|
||||
g.Expect(r.Index.Entries["grafana"][0].Digest).To(Equal("sha256:1234567890abc"))
|
||||
}
|
||||
|
||||
func TestChartRepository_LoadFromCache(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
32
main.go
32
main.go
|
@ -224,20 +224,6 @@ func main() {
|
|||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.GitRepositoryKind)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err = (&controllers.HelmRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: eventRecorder,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
Getters: getters,
|
||||
ControllerName: controllerName,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
|
||||
}); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind, "type", "default")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.HelmRepositoryOCIReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
|
@ -274,6 +260,24 @@ func main() {
|
|||
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
|
||||
if err = (&controllers.HelmRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: eventRecorder,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
Getters: getters,
|
||||
ControllerName: controllerName,
|
||||
Cache: c,
|
||||
TTL: ttl,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
|
||||
}); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.HelmChartReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
RegistryClientGenerator: registry.ClientGenerator,
|
||||
|
|
Loading…
Reference in New Issue