Merge pull request #698 from cwyl02/cwyl02/cache-helmrepo-early
feat: cache helmrepo early after reconcil
This commit is contained in:
commit
3013b5fc3d
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
"github.com/fluxcd/source-controller/internal/cache"
|
||||
serror "github.com/fluxcd/source-controller/internal/error"
|
||||
"github.com/fluxcd/source-controller/internal/helm/getter"
|
||||
"github.com/fluxcd/source-controller/internal/helm/repository"
|
||||
|
@ -105,6 +106,10 @@ type HelmRepositoryReconciler struct {
|
|||
Getters helmgetter.Providers
|
||||
Storage *Storage
|
||||
ControllerName string
|
||||
|
||||
Cache *cache.Cache
|
||||
TTL time.Duration
|
||||
*cache.CacheRecorder
|
||||
}
|
||||
|
||||
type HelmRepositoryReconcilerOptions struct {
|
||||
|
@ -451,7 +456,6 @@ func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sou
|
|||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
chartRepo.Unload()
|
||||
|
||||
// Mark observations about the revision on the object.
|
||||
if !obj.GetArtifact().HasRevision(chartRepo.Checksum) {
|
||||
|
@ -492,6 +496,8 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
|
|||
"stored artifact for revision '%s'", artifact.Revision)
|
||||
}
|
||||
|
||||
chartRepo.Unload()
|
||||
|
||||
if err := chartRepo.RemoveCache(); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to remove temporary cached index file")
|
||||
}
|
||||
|
@ -545,6 +551,26 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
|
|||
obj.Status.URL = indexURL
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.StorageOperationFailedCondition)
|
||||
|
||||
// enable cache if applicable
|
||||
if r.Cache != nil && chartRepo.IndexCache == nil {
|
||||
chartRepo.SetMemCache(r.Storage.LocalPath(*artifact), r.Cache, r.TTL, func(event string) {
|
||||
r.IncCacheEvents(event, obj.GetName(), obj.GetNamespace())
|
||||
})
|
||||
}
|
||||
|
||||
// Cache the index if it was successfully retrieved
|
||||
// and the chart was successfully built
|
||||
if r.Cache != nil && chartRepo.Index != nil {
|
||||
// The cache key have to be safe in multi-tenancy environments,
|
||||
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
|
||||
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
|
||||
err := chartRepo.CacheIndexInMemory()
|
||||
if err != nil {
|
||||
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1299,3 +1299,61 @@ func TestHelmRepositoryReconciler_ReconcileSpecUpdatePredicateFilter(t *testing.
|
|||
return false
|
||||
}, timeout).Should(BeTrue())
|
||||
}
|
||||
|
||||
func TestHelmRepositoryReconciler_InMemoryCaching(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
testCache.Clear()
|
||||
|
||||
testServer, err := helmtestserver.NewTempHelmServer()
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
defer os.RemoveAll(testServer.Root())
|
||||
|
||||
g.Expect(testServer.PackageChartWithVersion("testdata/charts/helmchart", "0.1.0")).To(Succeed())
|
||||
g.Expect(testServer.GenerateIndex()).To(Succeed())
|
||||
|
||||
testServer.Start()
|
||||
defer testServer.Stop()
|
||||
|
||||
ns, err := testEnv.CreateNamespace(ctx, "helmrepository")
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }()
|
||||
|
||||
helmRepo := &sourcev1.HelmRepository{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "helmrepository-",
|
||||
Namespace: ns.Name,
|
||||
},
|
||||
Spec: sourcev1.HelmRepositorySpec{
|
||||
URL: testServer.URL(),
|
||||
},
|
||||
}
|
||||
g.Expect(testEnv.CreateAndWait(ctx, helmRepo)).To(Succeed())
|
||||
|
||||
key := client.ObjectKey{Name: helmRepo.Name, Namespace: helmRepo.Namespace}
|
||||
// Wait for finalizer to be set
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, helmRepo); err != nil {
|
||||
return false
|
||||
}
|
||||
return len(helmRepo.Finalizers) > 0
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
// Wait for HelmRepository to be Ready
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, helmRepo); err != nil {
|
||||
return false
|
||||
}
|
||||
if !conditions.IsReady(helmRepo) || helmRepo.Status.Artifact == nil {
|
||||
return false
|
||||
}
|
||||
readyCondition := conditions.Get(helmRepo, meta.ReadyCondition)
|
||||
return helmRepo.Generation == readyCondition.ObservedGeneration &&
|
||||
helmRepo.Generation == helmRepo.Status.ObservedGeneration
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
err = testEnv.Get(ctx, key, helmRepo)
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
localPath := testStorage.LocalPath(*helmRepo.GetArtifact())
|
||||
_, cacheHit := testCache.Get(localPath)
|
||||
g.Expect(cacheHit).To(BeTrue())
|
||||
}
|
||||
|
|
|
@ -229,12 +229,18 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
|
||||
}
|
||||
|
||||
testCache = cache.New(5, 1*time.Second)
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
|
||||
if err := (&HelmRepositoryReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
Metrics: testMetricsH,
|
||||
Getters: testGetters,
|
||||
Storage: testStorage,
|
||||
Cache: testCache,
|
||||
TTL: 1 * time.Second,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManager(testEnv); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
@ -249,8 +255,6 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start HelmRepositoryOCIReconciler: %v", err))
|
||||
}
|
||||
|
||||
testCache = cache.New(5, 1*time.Second)
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
if err := (&HelmChartReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/source-controller/internal/cache"
|
||||
"github.com/fluxcd/source-controller/internal/helm"
|
||||
. "github.com/onsi/gomega"
|
||||
"helm.sh/helm/v3/pkg/chart"
|
||||
|
@ -450,6 +451,39 @@ func TestChartRepository_StrategicallyLoadIndex(t *testing.T) {
|
|||
g.Expect(r.RemoveCache()).To(Succeed())
|
||||
}
|
||||
|
||||
func TestChartRepository_CacheIndexInMemory(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
interval, _ := time.ParseDuration("5s")
|
||||
memCache := cache.New(1, interval)
|
||||
indexPath := "/multi-tenent-safe/mock/index.yaml"
|
||||
r := newChartRepository()
|
||||
r.Index = repo.NewIndexFile()
|
||||
indexFile := *r.Index
|
||||
g.Expect(
|
||||
indexFile.MustAdd(
|
||||
&chart.Metadata{
|
||||
Name: "grafana",
|
||||
Version: "6.17.4",
|
||||
},
|
||||
"grafana-6.17.4.tgz",
|
||||
"http://example.com/charts",
|
||||
"sha256:1234567890abc",
|
||||
)).To(Succeed())
|
||||
indexFile.WriteFile(indexPath, 0o640)
|
||||
ttl, _ := time.ParseDuration("1m")
|
||||
r.SetMemCache(indexPath, memCache, ttl, func(event string) {
|
||||
fmt.Println(event)
|
||||
})
|
||||
r.CacheIndexInMemory()
|
||||
_, cacheHit := r.IndexCache.Get(indexPath)
|
||||
g.Expect(cacheHit).To(Equal(true))
|
||||
r.Unload()
|
||||
g.Expect(r.Index).To(BeNil())
|
||||
g.Expect(r.StrategicallyLoadIndex()).To(Succeed())
|
||||
g.Expect(r.Index.Entries["grafana"][0].Digest).To(Equal("sha256:1234567890abc"))
|
||||
}
|
||||
|
||||
func TestChartRepository_LoadFromCache(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
32
main.go
32
main.go
|
@ -224,20 +224,6 @@ func main() {
|
|||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.GitRepositoryKind)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err = (&controllers.HelmRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: eventRecorder,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
Getters: getters,
|
||||
ControllerName: controllerName,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
|
||||
}); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind, "type", "default")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.HelmRepositoryOCIReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
|
@ -274,6 +260,24 @@ func main() {
|
|||
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
|
||||
if err = (&controllers.HelmRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: eventRecorder,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
Getters: getters,
|
||||
ControllerName: controllerName,
|
||||
Cache: c,
|
||||
TTL: ttl,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
|
||||
}); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.HelmChartReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
RegistryClientGenerator: registry.ClientGenerator,
|
||||
|
|
Loading…
Reference in New Issue