Merge pull request #626 from souleb/reuse-index-pool

Add optional in-memory cache of HelmRepository index files
This commit is contained in:
Stefan Prodan 2022-04-04 18:05:22 +03:00 committed by GitHub
commit 362bc56bd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 489 additions and 19 deletions

View File

@ -97,4 +97,7 @@ const (
// ArtifactUpToDateReason signals that an existing Artifact is up-to-date
// with the Source.
ArtifactUpToDateReason string = "ArtifactUpToDate"
// CacheOperationFailedReason signals a failure in cache operation.
CacheOperationFailedReason string = "CacheOperationFailed"
)

View File

@ -30,6 +30,7 @@ import (
securejoin "github.com/cyphar/filepath-securejoin"
helmgetter "helm.sh/helm/v3/pkg/getter"
helmrepo "helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -55,6 +56,7 @@ import (
"github.com/fluxcd/pkg/untar"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/chart"
"github.com/fluxcd/source-controller/internal/helm/getter"
@ -111,6 +113,9 @@ type HelmChartReconciler struct {
Storage *Storage
Getters helmgetter.Providers
ControllerName string
Cache *cache.Cache
TTL time.Duration
}
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
@ -456,6 +461,13 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
}
}
// Try to retrieve the repository index from the cache
if r.Cache != nil {
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); found {
chartRepo.Index = index.(*helmrepo.IndexFile)
}
}
// Construct the chart builder with scoped configuration
cb := chart.NewRemoteBuilder(chartRepo)
opts := chart.BuildOptions{
@ -479,6 +491,26 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
return sreconcile.ResultEmpty, err
}
defer func() {
// Cache the index if it was successfully retrieved
// and the chart was successfully built
if r.Cache != nil && chartRepo.Index != nil {
// The cache key have to be safe in multi-tenancy environments,
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
if err != nil {
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err)
}
}
// Delete the index reference
if chartRepo.Index != nil {
chartRepo.Unload()
}
}()
*b = *build
return sreconcile.ResultSuccess, nil
}

View File

@ -35,6 +35,7 @@ import (
"github.com/fluxcd/pkg/testserver"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
// +kubebuilder:scaffold:imports
)
@ -126,12 +127,15 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
cache := cache.New(5, 1*time.Second)
if err := (&HelmChartReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
Getters: testGetters,
Storage: testStorage,
Cache: cache,
TTL: 1 * time.Second,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}

View File

@ -390,6 +390,53 @@ Besides being reported in Events, the reconciliation errors are also logged by
the controller. The Flux CLI offer commands for filtering the logs for a
specific HelmChart, e.g. `flux logs --level=error --kind=HelmChart --name=<chart-name>`.
### Improving resource consumption by enabling the cache
When using a `HelmRepository` as Source for a `HelmChart`, the controller loads
the repository index in memory to find the latest version of the chart.
The controller can be configured to cache Helm repository indexes in memory.
The cache is used to avoid loading repository indexes for every `HelmChart`
reconciliation.
The following flags are provided to enable and configure the cache:
- `helm-cache-max-size`: The maximum size of the cache in number of indexes.
If `0`, then the cache is disabled.
- `helm-cache-ttl`: The TTL of an index in the cache.
- `helm-cache-purge-interval`: The interval at which the cache is purged of
expired items.
The caching strategy is to pull a repository index from the cache if it is
available, otherwise to load the index, retrieve and build the chart,
then cache the index. The cached index TTL is refreshed every time the
Helm repository index is loaded with the `helm-cache-ttl` value.
The cache is purged of expired items every `helm-cache-purge-interval`.
When the cache is full, no more items can be added to the cache, and the
source-controller will report a warning event instead.
In order to use the cache, set the related flags in the source-controller
Deployment config:
```yaml
spec:
containers:
- args:
- --watch-all-namespaces
- --log-level=info
- --log-encoding=json
- --enable-leader-election
- --storage-path=/data
- --storage-adv-addr=source-controller.$(RUNTIME_NAMESPACE).svc.cluster.local.
## Helm cache with up to 10 items, i.e. 10 indexes.
- --helm-cache-max-size=10
## TTL of an index is 1 hour.
- --helm-cache-ttl=1h
## Purge expired index every 10 minutes.
- --helm-cache-purge-interval=10m
```
## HelmChart Status
### Artifact

19
internal/cache/LICENSE vendored Normal file
View File

@ -0,0 +1,19 @@
Copyright (c) 2012-2019 Patrick Mylund Nielsen and the go-cache contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

246
internal/cache/cache.go vendored Normal file
View File

@ -0,0 +1,246 @@
// Copyright (c) 2012-2019 Patrick Mylund Nielsen and the go-cache contributors
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Copyright 2022 The FluxCD contributors. All rights reserved.
// This package provides an in-memory cache
// derived from the https://github.com/patrickmn/go-cache
// package
// It has been modified in order to keep a small set of functions
// and to add a maxItems parameter in order to limit the number of,
// and thus the size of, items in the cache.
package cache
import (
"fmt"
"runtime"
"sync"
"time"
)
// Cache is a thread-safe in-memory key/value store.
type Cache struct {
*cache
}
// Item is an item stored in the cache.
type Item struct {
// Object is the item's value.
Object interface{}
// Expiration is the item's expiration time.
Expiration int64
}
type cache struct {
// Items holds the elements in the cache.
Items map[string]Item
// MaxItems is the maximum number of items the cache can hold.
MaxItems int
mu sync.RWMutex
janitor *janitor
}
// ItemCount returns the number of items in the cache.
// This may include items that have expired, but have not yet been cleaned up.
func (c *cache) ItemCount() int {
c.mu.RLock()
n := len(c.Items)
c.mu.RUnlock()
return n
}
func (c *cache) set(key string, value interface{}, expiration time.Duration) {
var e int64
if expiration > 0 {
e = time.Now().Add(expiration).UnixNano()
}
c.Items[key] = Item{
Object: value,
Expiration: e,
}
}
// Set adds an item to the cache, replacing any existing item.
// If expiration is zero, the item never expires.
// If the cache is full, Set will return an error.
func (c *cache) Set(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}
c.mu.Unlock()
return fmt.Errorf("Cache is full")
}
// Add an item to the cache, existing items will not be overwritten.
// To overwrite existing items, use Set.
// If the cache is full, Add will return an error.
func (c *cache) Add(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.mu.Unlock()
return fmt.Errorf("Item %s already exists", key)
}
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}
c.mu.Unlock()
return fmt.Errorf("Cache is full")
}
// Get an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
func (c *cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
item, found := c.Items[key]
if !found {
c.mu.RUnlock()
return nil, false
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return nil, false
}
}
c.mu.RUnlock()
return item.Object, true
}
// Delete an item from the cache. Does nothing if the key is not in the cache.
func (c *cache) Delete(key string) {
c.mu.Lock()
delete(c.Items, key)
c.mu.Unlock()
}
// Clear all items from the cache.
// This reallocate the inderlying array holding the items,
// so that the memory used by the items is reclaimed.
func (c *cache) Clear() {
c.mu.Lock()
c.Items = make(map[string]Item)
c.mu.Unlock()
}
// HasExpired returns true if the item has expired.
func (c *cache) HasExpired(key string) bool {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return true
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return true
}
}
c.mu.RUnlock()
return false
}
// SetExpiration sets the expiration for the given key.
// Does nothing if the key is not in the cache.
func (c *cache) SetExpiration(key string, expiration time.Duration) {
c.mu.Lock()
item, ok := c.Items[key]
if !ok {
c.mu.Unlock()
return
}
item.Expiration = time.Now().Add(expiration).UnixNano()
c.mu.Unlock()
}
// GetExpiration returns the expiration for the given key.
// Returns zero if the key is not in the cache or the item
// has already expired.
func (c *cache) GetExpiration(key string) time.Duration {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return 0
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return 0
}
}
c.mu.RUnlock()
return time.Duration(item.Expiration - time.Now().UnixNano())
}
// DeleteExpired deletes all expired items from the cache.
func (c *cache) DeleteExpired() {
c.mu.Lock()
for k, v := range c.Items {
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
delete(c.Items, k)
}
}
c.mu.Unlock()
}
type janitor struct {
interval time.Duration
stop chan bool
}
func (j *janitor) run(c *cache) {
ticker := time.NewTicker(j.interval)
for {
select {
case <-ticker.C:
c.DeleteExpired()
case <-j.stop:
ticker.Stop()
return
}
}
}
func stopJanitor(c *Cache) {
c.janitor.stop <- true
}
// New creates a new cache with the given configuration.
func New(maxItems int, interval time.Duration) *Cache {
c := &cache{
Items: make(map[string]Item),
MaxItems: maxItems,
janitor: &janitor{
interval: interval,
stop: make(chan bool),
},
}
C := &Cache{c}
if interval > 0 {
go c.janitor.run(c)
runtime.SetFinalizer(C, stopJanitor)
}
return C
}

87
internal/cache/cache_test.go vendored Normal file
View File

@ -0,0 +1,87 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
"time"
. "github.com/onsi/gomega"
)
func TestCache(t *testing.T) {
g := NewWithT(t)
// create a cache that can hold 2 items and have no cleanup
cache := New(2, 0)
// Get an Item from the cache
if _, found := cache.Get("key1"); found {
t.Error("Item should not be found")
}
// Add an item to the cache
err := cache.Add("key1", "value1", 0)
g.Expect(err).ToNot(HaveOccurred())
// Get the item from the cache
item, found := cache.Get("key1")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value1"))
// Add another item to the cache
err = cache.Add("key2", "value2", 0)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ItemCount()).To(Equal(2))
// Get the item from the cache
item, found = cache.Get("key2")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value2"))
//Add an item to the cache
err = cache.Add("key3", "value3", 0)
g.Expect(err).To(HaveOccurred())
// Replace an item in the cache
err = cache.Set("key2", "value3", 0)
g.Expect(err).ToNot(HaveOccurred())
// Get the item from the cache
item, found = cache.Get("key2")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value3"))
// new cache with a cleanup interval of 1 second
cache = New(2, 1*time.Second)
// Add an item to the cache
err = cache.Add("key1", "value1", 2*time.Second)
g.Expect(err).ToNot(HaveOccurred())
// Get the item from the cache
item, found = cache.Get("key1")
g.Expect(found).To(BeTrue())
g.Expect(item).To(Equal("value1"))
// wait for the item to expire
time.Sleep(3 * time.Second)
// Get the item from the cache
item, found = cache.Get("key1")
g.Expect(found).To(BeFalse())
g.Expect(item).To(BeNil())
}

View File

@ -72,11 +72,13 @@ func (b *remoteChartBuilder) Build(_ context.Context, ref Reference, p string, o
return nil, &BuildError{Reason: ErrChartReference, Err: err}
}
if err := b.remote.LoadFromCache(); err != nil {
err = fmt.Errorf("could not load repository index for remote chart reference: %w", err)
return nil, &BuildError{Reason: ErrChartPull, Err: err}
// Load the repository index if not already present.
if b.remote.Index == nil {
if err := b.remote.LoadFromCache(); err != nil {
err = fmt.Errorf("could not load repository index for remote chart reference: %w", err)
return nil, &BuildError{Reason: ErrChartPull, Err: err}
}
}
defer b.remote.Unload()
// Get the current version for the RemoteReference
cv, err := b.remote.Get(remoteRef.Name, remoteRef.Version)

60
main.go
View File

@ -44,6 +44,7 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/controllers"
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/helm"
"github.com/fluxcd/source-controller/pkg/git/libgit2/managed"
// +kubebuilder:scaffold:imports
@ -71,21 +72,24 @@ func init() {
func main() {
var (
metricsAddr string
eventsAddr string
healthAddr string
storagePath string
storageAddr string
storageAdvAddr string
concurrent int
requeueDependency time.Duration
watchAllNamespaces bool
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
metricsAddr string
eventsAddr string
healthAddr string
storagePath string
storageAddr string
storageAdvAddr string
concurrent int
requeueDependency time.Duration
watchAllNamespaces bool
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
helmCacheMaxSize int
helmCacheTTL string
helmCachePurgeInterval string
)
flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
@ -110,6 +114,12 @@ func main() {
"The max allowed size in bytes of a file in a Helm chart.")
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second,
"The interval at which failing dependencies are reevaluated.")
flag.IntVar(&helmCacheMaxSize, "helm-cache-max-size", 0,
"The maximum size of the cache in number of indexes.")
flag.StringVar(&helmCacheTTL, "helm-cache-ttl", "15m",
"The TTL of an index in the cache. Valid time units are ns, us (or µs), ms, s, m, h.")
flag.StringVar(&helmCachePurgeInterval, "helm-cache-purge-interval", "1m",
"The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
@ -191,6 +201,24 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind)
os.Exit(1)
}
var c *cache.Cache
var ttl time.Duration
if helmCacheMaxSize > 0 {
interval, err := time.ParseDuration(helmCachePurgeInterval)
if err != nil {
setupLog.Error(err, "unable to parse cache purge interval")
os.Exit(1)
}
ttl, err = time.ParseDuration(helmCacheTTL)
if err != nil {
setupLog.Error(err, "unable to parse cache TTL")
os.Exit(1)
}
c = cache.New(helmCacheMaxSize, interval)
}
if err = (&controllers.HelmChartReconciler{
Client: mgr.GetClient(),
Storage: storage,
@ -198,6 +226,8 @@ func main() {
EventRecorder: eventRecorder,
Metrics: metricsH,
ControllerName: controllerName,
Cache: c,
TTL: ttl,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {