Merge pull request #638 from aryan9600/garbage-retention

Garbage collect with provided retention options
This commit is contained in:
Stefan Prodan 2022-04-07 16:33:51 +03:00 committed by GitHub
commit 741033e5bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 665 additions and 92 deletions

View File

@ -632,14 +632,19 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -176,7 +176,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -186,26 +186,30 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: sreconcile.ResultSuccess,
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
want: sreconcile.ResultSuccess,
},
{
name: "notices missing artifact in storage",
@ -237,7 +241,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
@ -259,6 +263,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &BucketReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,

View File

@ -708,13 +708,19 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -1104,6 +1104,148 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) {
}
}
func TestGitRepositoryReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.GitRepository, storage *Storage) error
want sreconcile.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
want: sreconcile.ResultSuccess,
},
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/invalid.txt",
Revision: "e",
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: sreconcile.ResultSuccess,
assertPaths: []string{
"!/reconcile-storage/invalid.txt",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"),
},
},
{
name: "updates hostname on diff from current",
beforeFunc: func(obj *sourcev1.GitRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: "http://outdated.com/reconcile-storage/hostname.txt",
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
},
want: sreconcile.ResultSuccess,
assertPaths: []string{
"/reconcile-storage/hostname.txt",
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
Size: int64p(int64(len("file"))),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &GitRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
}
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
var c *git.Commit
var as artifactSet
got, err := r.reconcileStorage(context.TODO(), obj, c, &as, "")
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
}
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}
}
func TestGitRepositoryReconciler_reconcileDelete(t *testing.T) {
g := NewWithT(t)

View File

@ -285,6 +285,9 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC
// they match the Storage server hostname of current runtime.
func (r *HelmChartReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmChart, build *chart.Build) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
@ -801,14 +804,19 @@ func (r *HelmChartReconciler) garbageCollect(ctx context.Context, obj *sourcev1.
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -177,7 +177,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.HelmChart, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -187,21 +187,25 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
@ -238,7 +242,7 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil
@ -260,6 +264,10 @@ func TestHelmChartReconciler_reconcileStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
defer func() {
g.Expect(os.RemoveAll(filepath.Join(testStorage.BasePath, "/reconcile-storage"))).To(Succeed())
}()
r := &HelmChartReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
@ -303,7 +311,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
storage, err := NewStorage(tmpDir, "example.com", timeout)
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
g.Expect(err).ToNot(HaveOccurred())
gitArtifact := &sourcev1.Artifact{
@ -777,7 +785,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
storage, err := NewStorage(tmpDir, "example.com", timeout)
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
g.Expect(err).ToNot(HaveOccurred())
chartsArtifact := &sourcev1.Artifact{

View File

@ -241,6 +241,9 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.
// they match the Storage server hostname of current runtime.
func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmRepository, _ *sourcev1.Artifact, _ *repository.ChartRepository) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
// Abort if it takes more than 5 seconds.
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
@ -515,14 +518,19 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour
return nil
}
if obj.GetArtifact() != nil {
if deleted, err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("garbage collection of artifacts failed: %w", err),
Reason: "GarbageCollectionFailed",
}
} else if len(deleted) > 0 {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, e.Reason, e.Err.Error())
return e
}
if len(delFiles) > 0 {
r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded",
"garbage collected old artifacts")
fmt.Sprintf("garbage collected %d artifacts", len(delFiles)))
return nil
}
}
return nil

View File

@ -24,6 +24,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/darkowlzz/controller-check/status"
"github.com/fluxcd/pkg/apis/meta"
@ -146,7 +147,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error {
revisions := []string{"a", "b", "c"}
revisions := []string{"a", "b", "c", "d"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
@ -156,21 +157,25 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0o644); err != nil {
return err
}
if n != len(revisions)-1 {
time.Sleep(time.Second * 1)
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
Size: int64p(int64(len("c"))),
Path: "/reconcile-storage/d.txt",
Revision: "d",
Checksum: "18ac3e7343f016890c510e93f935261169d9e3f565436429830faf0934f4f8e4",
URL: testStorage.Hostname + "/reconcile-storage/d.txt",
Size: int64p(int64(len("d"))),
},
assertPaths: []string{
"/reconcile-storage/d.txt",
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
@ -207,7 +212,7 @@ func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o644); err != nil {
return err
}
return nil

View File

@ -19,6 +19,7 @@ package controllers
import (
"archive/tar"
"compress/gzip"
"context"
"crypto/sha256"
"fmt"
"hash"
@ -26,21 +27,28 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
kerrors "k8s.io/apimachinery/pkg/util/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/lockedfile"
"io/fs"
"github.com/fluxcd/pkg/untar"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/fs"
sourcefs "github.com/fluxcd/source-controller/internal/fs"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
const GarbageCountLimit = 1000
// Storage manages artifacts
type Storage struct {
// BasePath is the local directory path where the source artifacts are stored.
@ -49,19 +57,25 @@ type Storage struct {
// Hostname is the file server host name used to compose the artifacts URIs.
Hostname string `json:"hostname"`
// Timeout for artifacts operations
Timeout time.Duration `json:"timeout"`
// ArtifactRetentionTTL is the maximum number of artifacts to be kept in storage
// after a garbage collection.
ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"`
// ArtifactRetentionRecords is the duration of time that artifacts will be kept in
// storage before being garbage collected.
ArtifactRetentionRecords int `json:"artifactRetentionRecords"`
}
// NewStorage creates the storage helper for a given path and hostname.
func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) {
func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) {
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
return nil, fmt.Errorf("invalid dir path: %s", basePath)
}
return &Storage{
BasePath: basePath,
Hostname: hostname,
Timeout: timeout,
ArtifactRetentionTTL: artifactRetentionTTL,
ArtifactRetentionRecords: artifactRetentionRecords,
}, nil
}
@ -145,6 +159,150 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err
return deletedFiles, nil
}
// getGarbageFiles returns all files that need to be garbage collected for the given artifact.
// Garbage files are determined based on the below flow:
// 1. collect all files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
garbageFiles := []string{}
filesWithCreatedTs := make(map[time.Time]string)
// sortedPaths contain all files sorted according to their created ts.
sortedPaths := []string{}
now := time.Now().UTC()
totalFiles := 0
var errors []string
creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
if totalFiles >= totalCountLimit {
return fmt.Errorf("Reached file walking limit, already walked over: %d", totalFiles)
}
info, err := d.Info()
if err != nil {
errors = append(errors, err.Error())
return nil
}
createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt)
// compare the time difference between now and the time at which the file was created
// with the provided ttl. delete if difference is greater than the ttl.
expired := diff > ttl
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink {
if path != localPath && expired {
garbageFiles = append(garbageFiles, path)
}
totalFiles += 1
filesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt)
}
return nil
})
if len(errors) > 0 {
return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ","))
}
// We already collected enough garbage files to satisfy the no. of max
// items that are supposed to be retained, so exit early.
if totalFiles-len(garbageFiles) < maxItemsToBeRetained {
return garbageFiles, nil
}
// sort all timestamps in an ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps {
path, ok := filesWithCreatedTs[ts]
if !ok {
return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts)
}
sortedPaths = append(sortedPaths, path)
}
var collected int
noOfGarbageFiles := len(garbageFiles)
for _, path := range sortedPaths {
if path != localPath && !stringInSlice(path, garbageFiles) {
// If we previously collected a few garbage files with an expired ttl, then take that into account
// when checking whether we need to remove more files to satisfy the max no. of items allowed
// in the filesystem, along with the no. of files already removed in this loop.
if noOfGarbageFiles > 0 {
if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
} else {
if len(sortedPaths)-collected > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
}
}
}
return garbageFiles, nil
}
// GarbageCollect removes all garabge files in the artifact dir according to the provided
// retention options.
func (s *Storage) GarbageCollect(ctx context.Context, artifact sourcev1.Artifact, timeout time.Duration) ([]string, error) {
delFilesChan := make(chan []string)
errChan := make(chan error)
// Abort if it takes more than the provided timeout duration.
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
go func() {
garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL)
if err != nil {
errChan <- err
return
}
var errors []error
var deleted []string
if len(garbageFiles) > 0 {
for _, file := range garbageFiles {
err := os.Remove(file)
if err != nil {
errors = append(errors, err)
} else {
deleted = append(deleted, file)
}
}
}
if len(errors) > 0 {
errChan <- kerrors.NewAggregate(errors)
return
}
delFilesChan <- deleted
}()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case delFiles := <-delFilesChan:
return delFiles, nil
case err := <-errChan:
return nil, err
}
}
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// ArtifactExist returns a boolean indicating whether the v1beta1.Artifact exists in storage and is a regular file.
func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool {
fi, err := os.Lstat(s.LocalPath(artifact))
@ -281,7 +439,7 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv
return err
}
if err := fs.RenameWithFallback(tmpName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tmpName, localPath); err != nil {
return err
}
@ -323,7 +481,7 @@ func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader,
return err
}
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}
@ -361,7 +519,7 @@ func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error
return err
}
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}
@ -421,7 +579,7 @@ func (s *Storage) CopyToPath(artifact *sourcev1.Artifact, subPath, toPath string
if err != nil {
return err
}
if err := fs.RenameWithFallback(fromPath, toPath); err != nil {
if err := sourcefs.RenameWithFallback(fromPath, toPath); err != nil {
return err
}
return nil

View File

@ -19,11 +19,13 @@ package controllers
import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
@ -48,7 +50,7 @@ func TestStorageConstructor(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
if _, err := NewStorage("/nonexistent", "hostname", time.Minute); err == nil {
if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2); err == nil {
t.Fatal("nonexistent path was allowable in storage constructor")
}
@ -58,13 +60,13 @@ func TestStorageConstructor(t *testing.T) {
}
f.Close()
if _, err := NewStorage(f.Name(), "hostname", time.Minute); err == nil {
if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2); err == nil {
os.Remove(f.Name())
t.Fatal("file path was accepted as basedir")
}
os.Remove(f.Name())
if _, err := NewStorage(dir, "hostname", time.Minute); err != nil {
if _, err := NewStorage(dir, "hostname", time.Minute, 2); err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
}
@ -117,7 +119,7 @@ func TestStorage_Archive(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
storage, err := NewStorage(dir, "hostname", time.Minute)
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
@ -289,7 +291,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
}
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
@ -305,7 +307,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
@ -368,7 +370,7 @@ func TestStorageRemoveAll(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Minute)
s, err := NewStorage(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
@ -398,7 +400,7 @@ func TestStorageCopyFromPath(t *testing.T) {
}
t.Cleanup(cleanupStoragePath(dir))
storage, err := NewStorage(dir, "hostname", time.Minute)
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
@ -486,3 +488,218 @@ func TestStorageCopyFromPath(t *testing.T) {
})
}
}
func TestStorage_getGarbageFiles(t *testing.T) {
artifactFolder := path.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
createPause time.Duration
ttl time.Duration
maxItemsToBeRetained int
totalCountLimit int
wantDeleted []string
}{
{
name: "delete files based on maxItemsToBeRetained",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Millisecond * 10,
ttl: time.Minute * 2,
totalCountLimit: 10,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
},
},
{
name: "delete files based on ttl",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*3 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
path.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*5 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
path.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Millisecond * 500,
ttl: time.Millisecond * 500,
totalCountLimit: 3,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "")
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred())
for _, artifactPath := range tt.artifactPaths {
f, err := os.Create(path.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
time.Sleep(tt.createPause)
}
deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl)
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
present := false
for _, deletedPath := range deletedPaths {
if strings.Contains(deletedPath, wantDeletedPath) {
present = true
break
}
}
if !present {
g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath))
}
}
})
}
}
func TestStorage_GarbageCollect(t *testing.T) {
artifactFolder := path.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
wantDeleted []string
wantErr string
ctxTimeout time.Duration
}{
{
name: "garbage collects",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
},
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
ctxTimeout: time.Second * 1,
},
{
name: "garbage collection fails with context timeout",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
},
wantErr: "context deadline exceeded",
ctxTimeout: time.Nanosecond * 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir, err := os.MkdirTemp("", "")
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() { os.RemoveAll(dir) })
s, err := NewStorage(dir, "hostname", time.Second*2, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := sourcev1.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(path.Join(dir, artifactFolder), 0o755)).ToNot(HaveOccurred())
for i, artifactPath := range tt.artifactPaths {
f, err := os.Create(path.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
if i != len(tt.artifactPaths)-1 {
time.Sleep(time.Second * 1)
}
}
deletedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout)
if tt.wantErr == "" {
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
} else {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
}
if len(tt.wantDeleted) > 0 {
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
present := false
for _, deletedPath := range deletedPaths {
if strings.Contains(deletedPath, wantDeletedPath) {
g.Expect(deletedPath).ToNot(BeAnExistingFile())
present = true
break
}
}
if present == false {
g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath))
}
}
}
})
}
}

View File

@ -48,6 +48,8 @@ import (
const (
timeout = 10 * time.Second
interval = 1 * time.Second
retentionTTL = 2 * time.Second
retentionRecords = 2
)
var (
@ -181,7 +183,7 @@ func initTestTLS() {
}
func newTestStorage(s *testserver.HTTPServer) (*Storage, error) {
storage, err := NewStorage(s.Root(), s.URL(), timeout)
storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords)
if err != nil {
return nil, err
}

12
main.go
View File

@ -92,6 +92,8 @@ func main() {
helmCacheTTL string
helmCachePurgeInterval string
kexAlgos []string
artifactRetentionTTL time.Duration
artifactRetentionRecords int
)
flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
@ -124,6 +126,10 @@ func main() {
"The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.")
flag.StringSliceVar(&kexAlgos, "ssh-kex-algos", []string{},
"The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second,
"The duration of time that artifacts will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
@ -177,7 +183,7 @@ func main() {
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
}
storage := mustInitStorage(storagePath, storageAdvAddr, setupLog)
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
setPreferredKexAlgos(kexAlgos)
if err = (&controllers.GitRepositoryReconciler{
@ -283,14 +289,14 @@ func startFileServer(path string, address string, l logr.Logger) {
}
}
func mustInitStorage(path string, storageAdvAddr string, l logr.Logger) *controllers.Storage {
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage {
if path == "" {
p, _ := os.Getwd()
path = filepath.Join(p, "bin")
os.MkdirAll(path, 0777)
}
storage, err := controllers.NewStorage(path, storageAdvAddr, 5*time.Minute)
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
if err != nil {
l.Error(err, "unable to initialise storage")
os.Exit(1)

View File

@ -174,7 +174,7 @@ func startEnvServer(setupReconcilers func(manager.Manager)) *envtest.Environment
panic(err)
}
defer os.RemoveAll(tmpStoragePath)
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Second*30)
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2)
if err != nil {
panic(err)
}