Enable dependency manager to use in memory cache
If implemented this will: - enable the helmCharts dependency manager to use the helm in memry cache to retrieve reconciled HelmRepositories indexes. - record cache events. Signed-off-by: Soule BA <soule@weave.works>
This commit is contained in:
parent
a4d339bf25
commit
0df8dcccec
|
@ -29,7 +29,6 @@ import (
|
|||
"time"
|
||||
|
||||
helmgetter "helm.sh/helm/v3/pkg/getter"
|
||||
helmrepo "helm.sh/helm/v3/pkg/repo"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -123,6 +122,7 @@ type HelmChartReconciler struct {
|
|||
|
||||
Cache *cache.Cache
|
||||
TTL time.Duration
|
||||
*cache.CacheRecorder
|
||||
}
|
||||
|
||||
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
|
@ -484,7 +484,10 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
|
|||
}
|
||||
|
||||
// Initialize the chart repository
|
||||
chartRepo, err := repository.NewChartRepository(repo.Spec.URL, r.Storage.LocalPath(*repo.GetArtifact()), r.Getters, tlsConfig, clientOpts)
|
||||
chartRepo, err := repository.NewChartRepository(repo.Spec.URL, r.Storage.LocalPath(*repo.GetArtifact()), r.Getters, tlsConfig, clientOpts,
|
||||
repository.WithMemoryCache(r.Storage.LocalPath(*repo.GetArtifact()), r.Cache, r.TTL, func(event string) {
|
||||
r.IncCacheEvents(event, obj.Name, obj.Namespace)
|
||||
}))
|
||||
if err != nil {
|
||||
// Any error requires a change in generation,
|
||||
// which we should be informed about by the watcher
|
||||
|
@ -506,13 +509,6 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
|
|||
}
|
||||
}
|
||||
|
||||
// Try to retrieve the repository index from the cache
|
||||
if r.Cache != nil {
|
||||
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); found {
|
||||
chartRepo.Index = index.(*helmrepo.IndexFile)
|
||||
}
|
||||
}
|
||||
|
||||
// Construct the chart builder with scoped configuration
|
||||
cb := chart.NewRemoteBuilder(chartRepo)
|
||||
opts := chart.BuildOptions{
|
||||
|
@ -543,11 +539,10 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
|
|||
// The cache key have to be safe in multi-tenancy environments,
|
||||
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
|
||||
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
|
||||
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
|
||||
err := chartRepo.CacheIndexInMemory()
|
||||
if err != nil {
|
||||
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Delete the index reference
|
||||
|
@ -615,7 +610,7 @@ func (r *HelmChartReconciler) buildFromTarballArtifact(ctx context.Context, obj
|
|||
|
||||
// Setup dependency manager
|
||||
dm := chart.NewDependencyManager(
|
||||
chart.WithRepositoryCallback(r.namespacedChartRepositoryCallback(ctx, obj.GetNamespace())),
|
||||
chart.WithRepositoryCallback(r.namespacedChartRepositoryCallback(ctx, obj.GetName(), obj.GetNamespace())),
|
||||
)
|
||||
defer dm.Clear()
|
||||
|
||||
|
@ -847,7 +842,7 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1.
|
|||
// namespacedChartRepositoryCallback returns a chart.GetChartRepositoryCallback scoped to the given namespace.
|
||||
// The returned callback returns a repository.ChartRepository configured with the retrieved v1beta1.HelmRepository,
|
||||
// or a shim with defaults if no object could be found.
|
||||
func (r *HelmChartReconciler) namespacedChartRepositoryCallback(ctx context.Context, namespace string) chart.GetChartRepositoryCallback {
|
||||
func (r *HelmChartReconciler) namespacedChartRepositoryCallback(ctx context.Context, name, namespace string) chart.GetChartRepositoryCallback {
|
||||
return func(url string) (*repository.ChartRepository, error) {
|
||||
var tlsConfig *tls.Config
|
||||
repo, err := r.resolveDependencyRepository(ctx, url, namespace)
|
||||
|
@ -888,8 +883,15 @@ func (r *HelmChartReconciler) namespacedChartRepositoryCallback(ctx context.Cont
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure that the cache key is the same as the artifact path
|
||||
// otherwise don't enable caching. We don't want to cache indexes
|
||||
// for repositories that are not reconciled by the source controller.
|
||||
if repo.Status.Artifact != nil {
|
||||
chartRepo.CachePath = r.Storage.LocalPath(*repo.GetArtifact())
|
||||
chartRepo.SetMemCache(r.Storage.LocalPath(*repo.GetArtifact()), r.Cache, r.TTL, func(event string) {
|
||||
r.IncCacheEvents(event, name, namespace)
|
||||
})
|
||||
}
|
||||
return chartRepo, nil
|
||||
}
|
||||
|
|
|
@ -129,15 +129,17 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
||||
cache := cache.New(5, 1*time.Second)
|
||||
c := cache.New(5, 1*time.Second)
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
if err := (&HelmChartReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
Metrics: testMetricsH,
|
||||
Getters: testGetters,
|
||||
Storage: testStorage,
|
||||
Cache: cache,
|
||||
Cache: c,
|
||||
TTL: 1 * time.Second,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManager(testEnv); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -38,6 +38,7 @@ require (
|
|||
github.com/minio/minio-go/v7 v7.0.24
|
||||
github.com/onsi/gomega v1.19.0
|
||||
github.com/otiai10/copy v1.7.0
|
||||
github.com/prometheus/client_golang v1.12.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
|
@ -173,7 +174,6 @@ require (
|
|||
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v1.12.1 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.32.1 // indirect
|
||||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
Copyright 2022 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// CacheEventTypeMiss is the event type for cache misses.
|
||||
CacheEventTypeMiss = "cache_miss"
|
||||
// CacheEventTypeHit is the event type for cache hits.
|
||||
CacheEventTypeHit = "cache_hit"
|
||||
)
|
||||
|
||||
// CacheRecorder is a recorder for cache events.
|
||||
type CacheRecorder struct {
|
||||
// cacheEventsCounter is a counter for cache events.
|
||||
cacheEventsCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
// NewCacheRecorder returns a new CacheRecorder.
|
||||
// The configured labels are: event_type, name, namespace.
|
||||
// The event_type is one of:
|
||||
// - "miss"
|
||||
// - "hit"
|
||||
// - "update"
|
||||
// The name is the name of the reconciled resource.
|
||||
// The namespace is the namespace of the reconciled resource.
|
||||
func NewCacheRecorder() *CacheRecorder {
|
||||
return &CacheRecorder{
|
||||
cacheEventsCounter: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "gotk_cache_events_total",
|
||||
Help: "Total number of cache retrieval events for a Gitops Toolkit resource reconciliation.",
|
||||
},
|
||||
[]string{"event_type", "name", "namespace"},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Collectors returns the metrics.Collector objects for the CacheRecorder.
|
||||
func (r *CacheRecorder) Collectors() []prometheus.Collector {
|
||||
return []prometheus.Collector{
|
||||
r.cacheEventsCounter,
|
||||
}
|
||||
}
|
||||
|
||||
// IncCacheEventCount increment by 1 the cache event count for the given event type, name and namespace.
|
||||
func (r *CacheRecorder) IncCacheEvents(event, name, namespace string) {
|
||||
r.cacheEventsCounter.WithLabelValues(event, name, namespace).Inc()
|
||||
}
|
||||
|
||||
// MustMakeMetrics creates a new CacheRecorder, and registers the metrics collectors in the controller-runtime metrics registry.
|
||||
func MustMakeMetrics() *CacheRecorder {
|
||||
r := NewCacheRecorder()
|
||||
metrics.Registry.MustRegister(r.Collectors()...)
|
||||
|
||||
return r
|
||||
}
|
|
@ -73,12 +73,10 @@ func (b *remoteChartBuilder) Build(_ context.Context, ref Reference, p string, o
|
|||
}
|
||||
|
||||
// Load the repository index if not already present.
|
||||
if b.remote.Index == nil {
|
||||
if err := b.remote.LoadFromCache(); err != nil {
|
||||
if err := b.remote.StrategicallyLoadIndex(); err != nil {
|
||||
err = fmt.Errorf("could not load repository index for remote chart reference: %w", err)
|
||||
return nil, &BuildError{Reason: ErrChartPull, Err: err}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current version for the RemoteReference
|
||||
cv, err := b.remote.Get(remoteRef.Name, remoteRef.Version)
|
||||
|
|
|
@ -97,6 +97,9 @@ func NewDependencyManager(opts ...DependencyManagerOption) *DependencyManager {
|
|||
func (dm *DependencyManager) Clear() []error {
|
||||
var errs []error
|
||||
for _, v := range dm.repositories {
|
||||
if err := v.CacheIndexInMemory(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
v.Unload()
|
||||
if err := v.RemoveCache(); err != nil {
|
||||
errs = append(errs, err)
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver/v3"
|
||||
"helm.sh/helm/v3/pkg/getter"
|
||||
|
@ -38,6 +39,7 @@ import (
|
|||
|
||||
"github.com/fluxcd/pkg/version"
|
||||
|
||||
"github.com/fluxcd/source-controller/internal/cache"
|
||||
"github.com/fluxcd/source-controller/internal/helm"
|
||||
"github.com/fluxcd/source-controller/internal/transport"
|
||||
)
|
||||
|
@ -70,13 +72,52 @@ type ChartRepository struct {
|
|||
tlsConfig *tls.Config
|
||||
|
||||
*sync.RWMutex
|
||||
|
||||
cacheInfo
|
||||
}
|
||||
|
||||
type cacheInfo struct {
|
||||
// In memory cache of the index.yaml file.
|
||||
IndexCache *cache.Cache
|
||||
// IndexKey is the cache key for the index.yaml file.
|
||||
IndexKey string
|
||||
// IndexTTL is the cache TTL for the index.yaml file.
|
||||
IndexTTL time.Duration
|
||||
// RecordIndexCacheMetric records the cache hit/miss metrics for the index.yaml file.
|
||||
RecordIndexCacheMetric RecordMetricsFunc
|
||||
}
|
||||
|
||||
// ChartRepositoryOptions is a function that can be passed to NewChartRepository
|
||||
// to configure a ChartRepository.
|
||||
type ChartRepositoryOption func(*ChartRepository) error
|
||||
|
||||
// RecordMetricsFunc is a function that records metrics.
|
||||
type RecordMetricsFunc func(event string)
|
||||
|
||||
// WithMemoryCache returns a ChartRepositoryOptions that will enable the
|
||||
// ChartRepository to cache the index.yaml file in memory.
|
||||
// The cache key have to be safe in multi-tenancy environments,
|
||||
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
|
||||
func WithMemoryCache(key string, c *cache.Cache, ttl time.Duration, rec RecordMetricsFunc) ChartRepositoryOption {
|
||||
return func(r *ChartRepository) error {
|
||||
if c != nil {
|
||||
if key == "" {
|
||||
return errors.New("cache key cannot be empty")
|
||||
}
|
||||
}
|
||||
r.IndexCache = c
|
||||
r.IndexKey = key
|
||||
r.IndexTTL = ttl
|
||||
r.RecordIndexCacheMetric = rec
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewChartRepository constructs and returns a new ChartRepository with
|
||||
// the ChartRepository.Client configured to the getter.Getter for the
|
||||
// repository URL scheme. It returns an error on URL parsing failures,
|
||||
// or if there is no getter available for the scheme.
|
||||
func NewChartRepository(repositoryURL, cachePath string, providers getter.Providers, tlsConfig *tls.Config, opts []getter.Option) (*ChartRepository, error) {
|
||||
func NewChartRepository(repositoryURL, cachePath string, providers getter.Providers, tlsConfig *tls.Config, getterOpts []getter.Option, chartRepoOpts ...ChartRepositoryOption) (*ChartRepository, error) {
|
||||
u, err := url.Parse(repositoryURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -90,8 +131,15 @@ func NewChartRepository(repositoryURL, cachePath string, providers getter.Provid
|
|||
r.URL = repositoryURL
|
||||
r.CachePath = cachePath
|
||||
r.Client = c
|
||||
r.Options = opts
|
||||
r.Options = getterOpts
|
||||
r.tlsConfig = tlsConfig
|
||||
|
||||
for _, opt := range chartRepoOpts {
|
||||
if err := opt(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
|
@ -292,14 +340,39 @@ func (r *ChartRepository) CacheIndex() (string, error) {
|
|||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// StrategicallyLoadIndex lazy-loads the Index from CachePath using
|
||||
// LoadFromCache if it does not HasIndex.
|
||||
// CacheIndexInMemory attempts to cache the index in memory.
|
||||
// It returns an error if it fails.
|
||||
// The cache key have to be safe in multi-tenancy environments,
|
||||
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
|
||||
func (r *ChartRepository) CacheIndexInMemory() error {
|
||||
// Cache the index if it was successfully retrieved
|
||||
// and the chart was successfully built
|
||||
if r.IndexCache != nil && r.Index != nil {
|
||||
err := r.IndexCache.Set(r.IndexKey, r.Index, r.IndexTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StrategicallyLoadIndex lazy-loads the Index
|
||||
// first from Indexcache,
|
||||
// then from CachePath using oadFromCache if it does not HasIndex.
|
||||
// If not HasCacheFile, a cache attempt is made using CacheIndex
|
||||
// before continuing to load.
|
||||
func (r *ChartRepository) StrategicallyLoadIndex() (err error) {
|
||||
if r.HasIndex() {
|
||||
return
|
||||
}
|
||||
|
||||
if r.IndexCache != nil {
|
||||
if found := r.LoadFromMemCache(); found {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if !r.HasCacheFile() {
|
||||
if _, err = r.CacheIndex(); err != nil {
|
||||
err = fmt.Errorf("failed to strategically load index: %w", err)
|
||||
|
@ -313,6 +386,28 @@ func (r *ChartRepository) StrategicallyLoadIndex() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// LoadFromMemCache attempts to load the Index from the provided cache.
|
||||
// It returns true if the Index was found in the cache, and false otherwise.
|
||||
func (r *ChartRepository) LoadFromMemCache() bool {
|
||||
if index, found := r.IndexCache.Get(r.IndexKey); found {
|
||||
r.Lock()
|
||||
r.Index = index.(*repo.IndexFile)
|
||||
r.Unlock()
|
||||
|
||||
// record the cache hit
|
||||
if r.RecordIndexCacheMetric != nil {
|
||||
r.RecordIndexCacheMetric(cache.CacheEventTypeHit)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// record the cache miss
|
||||
if r.RecordIndexCacheMetric != nil {
|
||||
r.RecordIndexCacheMetric(cache.CacheEventTypeMiss)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// LoadFromCache attempts to load the Index from the configured CachePath.
|
||||
// It returns an error if no CachePath is set, or if the load failed.
|
||||
func (r *ChartRepository) LoadFromCache() error {
|
||||
|
@ -375,6 +470,14 @@ func (r *ChartRepository) Unload() {
|
|||
r.Index = nil
|
||||
}
|
||||
|
||||
// SetMemCache sets the cache to use for this repository.
|
||||
func (r *ChartRepository) SetMemCache(key string, c *cache.Cache, ttl time.Duration, rec RecordMetricsFunc) {
|
||||
r.IndexKey = key
|
||||
r.IndexCache = c
|
||||
r.IndexTTL = ttl
|
||||
r.RecordIndexCacheMetric = rec
|
||||
}
|
||||
|
||||
// RemoveCache removes the CachePath if Cached.
|
||||
func (r *ChartRepository) RemoveCache() error {
|
||||
if r == nil {
|
||||
|
|
4
main.go
4
main.go
|
@ -233,6 +233,9 @@ func main() {
|
|||
|
||||
c = cache.New(helmCacheMaxSize, interval)
|
||||
}
|
||||
|
||||
cacheRecorder := cache.MustMakeMetrics()
|
||||
|
||||
if err = (&controllers.HelmChartReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Storage: storage,
|
||||
|
@ -242,6 +245,7 @@ func main() {
|
|||
ControllerName: controllerName,
|
||||
Cache: c,
|
||||
TTL: ttl,
|
||||
CacheRecorder: cacheRecorder,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
|
||||
|
|
Loading…
Reference in New Issue