Garbage collect with provided retention options.

Introduce two new flags to configure the ttl of an artifact and the max
no. of files to retain for an artifact. Modify the gc process to
consider the options and use timeouts to prevent the controller from
hanging.
This helps in situations when the SC has already garbage collected the
current artifact but the advertised artifact url is still the same,
which leads to the server returning a 404.

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
This commit is contained in:
Sanskar Jaiswal 2022-04-07 16:12:53 +05:30
parent 62604a2206
commit f8c27a85dd
13 changed files with 665 additions and 92 deletions

View File

@ -632,14 +632,19 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -176,7 +176,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -186,26 +186,30 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: sreconcile.ResultSuccess,
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
want: sreconcile.ResultSuccess,
},
{
name: "notices missing artifact in storage",
@ -237,7 +241,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
@ -259,6 +263,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,

View File

@ -708,13 +708,19 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -1104,6 +1104,148 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) {
}
}
func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
want: sreconcile.ResultSuccess,
},
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/invalid.txt",
Revision: "e",
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: sreconcile.ResultSuccess,
assertPaths: []string{
"!/reconcile-storage/invalid.txt",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"),
},
},
{
name: "updates hostname on diff from current",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: "http://outdated.com/reconcile-storage/hostname.txt",
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
},
want: sreconcile.ResultSuccess,
assertPaths: []string{
"/reconcile-storage/hostname.txt",
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
Size: int64p(int64(len("file"))),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &GitRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
}
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
var c *git.Commit
var as artifactSet
got, err := r.reconcileStorage(context.TODO(), obj, c, &as, "")
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
}
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}
}
func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) {
g := NewWithT(t)

View File

@ -285,6 +285,9 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC
// they match the Storage server hostname of current runtime.
func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmChart, build *chart.Build) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
@ -801,14 +804,19 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1.
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -177,7 +177,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -187,21 +187,25 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
@ -238,7 +242,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
@ -260,6 +264,10 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &HelmChartReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
@ -303,7 +311,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
storage, err := NewStorage(tmpDir, "example.com", timeout)
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
g.Expect(err).ToNot(HaveOccurred())
gitArtifact := &sourcev1.Artifact{
@ -777,7 +785,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
storage, err := NewStorage(tmpDir, "example.com", timeout)
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
g.Expect(err).ToNot(HaveOccurred())
chartsArtifact := &sourcev1.Artifact{

View File

@ -241,6 +241,9 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.
// they match the Storage server hostname of current runtime.
func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmRepository, _ *sourcev1.Artifact, _ *repository.ChartRepository) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
@ -515,14 +518,19 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -24,6 +24,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/darkowlzz/controller-check/status"
"github.com/fluxcd/pkg/apis/meta"
@ -146,7 +147,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -156,21 +157,25 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
@ -207,7 +212,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil

View File

@ -19,6 +19,7 @@ package controllers
import (
"archive/tar"
"compress/gzip"
"context"
"crypto/sha256"
"fmt"
"hash"
@ -26,21 +27,28 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
kerrors "k8s.io/apimachinery/pkg/util/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/lockedfile"
"io/fs"
"github.com/fluxcd/pkg/untar"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/fs"
sourcefs "github.com/fluxcd/source-controller/internal/fs"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
const GarbageCountLimit = 1000
// Storage manages artifacts
type Storage struct {
// BasePath is the local directory path where the source artifacts are stored.
@ -49,19 +57,25 @@ type Storage struct {
// Hostname is the file server host name used to compose the artifacts URIs.
Hostname string `json:"hostname"`
// Timeout for artifacts operations
Timeout time.Duration `json:"timeout"`
// ArtifactRetentionTTL is the maximum number of artifacts to be kept in storage
// after a garbage collection.
ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"`
// ArtifactRetentionRecords is the duration of time that artifacts will be kept in
// storage before being garbage collected.
ArtifactRetentionRecords int `json:"artifactRetentionRecords"`
}
// NewStorage creates the storage helper for a given path and hostname.
func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) {
func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) {
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
return nil, fmt.Errorf("invalid dir path: %s", basePath)
}
return &Storage{
BasePath: basePath,
Hostname: hostname,
Timeout: timeout,
ArtifactRetentionTTL: artifactRetentionTTL,
ArtifactRetentionRecords: artifactRetentionRecords,
}, nil
}
@ -145,6 +159,150 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err
return deletedFiles, nil
}
// getGarbageFiles returns all files that need to be garbage collected for the given artifact.
// Garbage files are determined based on the below flow:
// 1. collect all files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
garbageFiles := []string{}
filesWithCreatedTs := make(map[time.Time]string)
// sortedPaths contain all files sorted according to their created ts.
sortedPaths := []string{}
now := time.Now().UTC()
totalFiles := 0
var errors []string
creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
if totalFiles >= totalCountLimit {
return fmt.Errorf("Reached file walking limit, already walked over: %d", totalFiles)
}
info, err := d.Info()
if err != nil {
errors = append(errors, err.Error())
return nil
}
createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt)
// compare the time difference between now and the time at which the file was created
// with the provided ttl. delete if difference is greater than the ttl.
expired := diff > ttl
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink {
if path != localPath && expired {
garbageFiles = append(garbageFiles, path)
}
totalFiles += 1
filesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt)
}
return nil
})
if len(errors) > 0 {
return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ","))
}
// We already collected enough garbage files to satisfy the no. of max
// items that are supposed to be retained, so exit early.
if totalFiles-len(garbageFiles) < maxItemsToBeRetained {
return garbageFiles, nil
}
// sort all timestamps in an ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps {
path, ok := filesWithCreatedTs[ts]
if !ok {
return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts)
}
sortedPaths = append(sortedPaths, path)
}
var collected int
noOfGarbageFiles := len(garbageFiles)
for _, path := range sortedPaths {
if path != localPath && !stringInSlice(path, garbageFiles) {
// If we previously collected a few garbage files with an expired ttl, then take that into account
// when checking whether we need to remove more files to satisfy the max no. of items allowed
// in the filesystem, along with the no. of files already removed in this loop.
if noOfGarbageFiles > 0 {
if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
} else {
if len(sortedPaths)-collected > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
}
}
}
return garbageFiles, nil
}
// GarbageCollect removes all garabge files in the artifact dir according to the provided
// retention options.
func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact, timeout time.Duration) ([]string, error) {
delFilesChan := make(chan []string)
errChan := make(chan error)
// Abort if it takes more than the provided timeout duration.
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
go func() {
garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL)
if err != nil {
errChan <- err
return
}
var errors []error
var deleted []string
if len(garbageFiles) > 0 {
for _, file := range garbageFiles {
err := os.Remove(file)
if err != nil {
errors = append(errors, err)
} else {
deleted = append(deleted, file)
}
}
}
if len(errors) > 0 {
errChan <- kerrors.NewAggregate(errors)
return
}
delFilesChan <- deleted
}()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case delFiles := <-delFilesChan:
return delFiles, nil
case err := <-errChan:
return nil, err
}
}
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// ArtifactExist returns a boolean indicating whether the v1beta1.Artifact exists in storage and is a regular file.
func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool {
fi, err := os.Lstat(s.LocalPath(artifact))
@ -281,7 +439,7 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv
return err
}
if err := fs.RenameWithFallback(tmpName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tmpName, localPath); err != nil {
return err
}
@ -323,7 +481,7 @@ func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader,
return err
}
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}
@ -361,7 +519,7 @@ func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error
return err
}
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}
@ -421,7 +579,7 @@ func (s *Storage) CopyToPath(artifact *sourcev1.Artifact, subPath, toPath string
if err != nil {
return err
}
if err := fs.RenameWithFallback(fromPath, toPath); err != nil {
if err := sourcefs.RenameWithFallback(fromPath, toPath); err != nil {
return err
}
return nil

View File

@ -19,11 +19,13 @@ package controllers
import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
@ -48,7 +50,7 @@ func TestStorageConstructor(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
if _, err := NewStorage("/nonexistent", "hostname", time.Minute); err == nil {
if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2); err == nil {
t.Fatal("nonexistent path was allowable in storage constructor")
}
@ -58,13 +60,13 @@ func TestStorageConstructor(t *testing.T) {
}
f.Close()
if _, err := NewStorage(f.Name(), "hostname", time.Minute); err == nil {
if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2); err == nil {
os.Remove(f.Name())
t.Fatal("file path was accepted as basedir")
}
os.Remove(f.Name())
if _, err := NewStorage(dir, "hostname", time.Minute); err != nil {
if _, err := NewStorage(dir, "hostname", time.Minute, 2); err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
}
@ -117,7 +119,7 @@ func TestStorage_Archive(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
storage, err := NewStorage(dir, "hostname", time.Minute)
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
@ -289,7 +291,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
}
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
@ -305,7 +307,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
@ -368,7 +370,7 @@ func TestStorageRemoveAll(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
@ -398,7 +400,7 @@ func TestStorageCopyFromPath(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
storage, err := NewStorage(dir, "hostname", time.Minute)
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
@ -486,3 +488,218 @@ func TestStorageCopyFromPath(t *testing.T) {
})
}
}
func TestStorage_getGarbageFiles(t *testing.T) {
artifactFolder := path.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
createPause time.Duration
ttl time.Duration
maxItemsToBeRetained int
totalCountLimit int
wantDeleted []string
}{
{
name: "delete files based on maxItemsToBeRetained",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Millisecond * 10,
ttl: time.Minute * 2,
totalCountLimit: 10,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
},
},
{
name: "delete files based on ttl",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*3 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
path.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*5 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
path.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Millisecond * 500,
ttl: time.Millisecond * 500,
totalCountLimit: 3,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "")
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred())
for _, artifactPath := range tt.artifactPaths {
f, err := os.Create(path.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
time.Sleep(tt.createPause)
}
deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl)
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
present := false
for _, deletedPath := range deletedPaths {
if strings.Contains(deletedPath, wantDeletedPath) {
present = true
break
}
}
if !present {
g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath))
}
}
})
}
}
func TestStorage_GarbageCollect(t *testing.T) {
artifactFolder := path.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
wantDeleted []string
wantErr string
ctxTimeout time.Duration
}{
{
name: "garbage collects",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
},
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
ctxTimeout: time.Second * 1,
},
{
name: "garbage collection fails with context timeout",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
},
wantErr: "context deadline exceeded",
ctxTimeout: time.Nanosecond * 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "")
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Second*2, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred())
for i, artifactPath := range tt.artifactPaths {
f, err := os.Create(path.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
if i != len(tt.artifactPaths)-1 {
time.Sleep(time.Second * 1)
}
}
deletedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout)
if tt.wantErr == "" {
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
} else {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
}
if len(tt.wantDeleted) > 0 {
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
present := false
for _, deletedPath := range deletedPaths {
if strings.Contains(deletedPath, wantDeletedPath) {
g.Expect(deletedPath).ToNot(BeAnExistingFile())
present = true
break
}
}
if present == false {
g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath))
}
}
}
})
}
}

View File

@ -48,6 +48,8 @@ import (
const (
timeout = 10 * time.Second
interval = 1 * time.Second
retentionTTL = 2 * time.Second
retentionRecords = 2
)
var (
@ -181,7 +183,7 @@ func initTestTLS() {
}
func newTestStorage(s *testserver.HTTPServer) (*Storage, error) {
storage, err := NewStorage(s.Root(), s.URL(), timeout)
storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords)
if err != nil {
return nil, err
}

12
main.go
View File

@ -92,6 +92,8 @@ func main() {
helmCacheTTL string
helmCachePurgeInterval string
kexAlgos []string
artifactRetentionTTL time.Duration
artifactRetentionRecords int
)
flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
@ -124,6 +126,10 @@ func main() {
"The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.")
flag.StringSliceVar(&kexAlgos, "ssh-kex-algos", []string{},
"The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second,
"The duration of time that artifacts will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
@ -177,7 +183,7 @@ func main() {
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
}
storage := mustInitStorage(storagePath, storageAdvAddr, setupLog)
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
setPreferredKexAlgos(kexAlgos)
if err = (&controllers.GitRepositoryReconciler{
@ -283,14 +289,14 @@ func startFileServer(path string, address string, l logr.Logger) {
}
}
func mustInitStorage(path string, storageAdvAddr string, l logr.Logger) *controllers.Storage {
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage {
if path == "" {
p, _ := os.Getwd()
path = filepath.Join(p, "bin")
os.MkdirAll(path, 0777)
}
storage, err := controllers.NewStorage(path, storageAdvAddr, 5*time.Minute)
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
if err != nil {
l.Error(err, "unable to initialise storage")
os.Exit(1)

View File

@ -174,7 +174,7 @@ func startEnvServer(setupReconcilers func(manager.Manager)) *envtest.Environment
panic(err)
}
defer os.RemoveAll(tmpStoragePath)
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Second*30)
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2)
if err != nil {
panic(err)
}