Introduce artifact max size limit of 50MiB
Add a controller flag named "--artifact-max-size=<bytes>" with the default value of 50MiB. To disable the limit, the value can be set to "--artifact-max-size=-1". The flag enforces a max size limit for the artifact contents produced by source-controller, to avoid out-of-memory crashes of consumers such as kustomize-controller. Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
parent
35ea086358
commit
3168bb6051
|
|
@ -422,7 +422,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) {
|
|||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
gitArtifact := &sourcev1.Artifact{
|
||||
|
|
@ -902,7 +902,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) {
|
|||
metadata, err := loadTestChartToOCI(chartData, chartPath, testRegistryServer)
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
cachedArtifact := &sourcev1.Artifact{
|
||||
|
|
@ -1119,7 +1119,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
|
|||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords)
|
||||
storage, err := NewStorage(tmpDir, "example.com", retentionTTL, retentionRecords, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
chartsArtifact := &sourcev1.Artifact{
|
||||
|
|
|
|||
|
|
@ -61,10 +61,14 @@ type Storage struct {
|
|||
// ArtifactRetentionRecords is the maximum number of artifacts to be kept in
|
||||
// storage after a garbage collection.
|
||||
ArtifactRetentionRecords int `json:"artifactRetentionRecords"`
|
||||
|
||||
// ArtifactMaxSize sets the max size in bytes for an artifact contents.
|
||||
// Setting the value to zero or a negative value, disables the limit.
|
||||
ArtifactMaxSize int64 `json:"artifactMaxSize"`
|
||||
}
|
||||
|
||||
// NewStorage creates the storage helper for a given path and hostname.
|
||||
func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) {
|
||||
func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, artifactMaxSize int64) (*Storage, error) {
|
||||
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
|
||||
return nil, fmt.Errorf("invalid dir path: %s", basePath)
|
||||
}
|
||||
|
|
@ -73,6 +77,7 @@ func NewStorage(basePath string, hostname string, artifactRetentionTTL time.Dura
|
|||
Hostname: hostname,
|
||||
ArtifactRetentionTTL: artifactRetentionTTL,
|
||||
ArtifactRetentionRecords: artifactRetentionRecords,
|
||||
ArtifactMaxSize: artifactMaxSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -432,6 +437,11 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv
|
|||
return err
|
||||
}
|
||||
|
||||
if s.ArtifactMaxSize > 0 && sz.written > s.ArtifactMaxSize {
|
||||
return fmt.Errorf("artifact size %d exceeds the max limit of %d bytes, use .sourceignore to exclude files from the artifact",
|
||||
sz.written, s.ArtifactMaxSize)
|
||||
}
|
||||
|
||||
if err := os.Chmod(tmpName, 0o600); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ import (
|
|||
func TestStorageConstructor(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2); err == nil {
|
||||
if _, err := NewStorage("/nonexistent", "hostname", time.Minute, 2, 0); err == nil {
|
||||
t.Fatal("nonexistent path was allowable in storage constructor")
|
||||
}
|
||||
|
||||
|
|
@ -48,13 +48,13 @@ func TestStorageConstructor(t *testing.T) {
|
|||
}
|
||||
f.Close()
|
||||
|
||||
if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2); err == nil {
|
||||
if _, err := NewStorage(f.Name(), "hostname", time.Minute, 2, 0); err == nil {
|
||||
os.Remove(f.Name())
|
||||
t.Fatal("file path was accepted as basedir")
|
||||
}
|
||||
os.Remove(f.Name())
|
||||
|
||||
if _, err := NewStorage(dir, "hostname", time.Minute, 2); err != nil {
|
||||
if _, err := NewStorage(dir, "hostname", time.Minute, 2, 0); err != nil {
|
||||
t.Fatalf("Valid path did not successfully return: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@ func walkTar(tarFile string, match string, dir bool) (int64, bool, error) {
|
|||
func TestStorage_Archive(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
|
||||
storage, err := NewStorage(dir, "hostname", time.Minute, 2, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error while bootstrapping storage: %v", err)
|
||||
}
|
||||
|
|
@ -263,7 +263,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
|
|||
t.Run("bad directory in archive", func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2)
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Valid path did not successfully return: %v", err)
|
||||
}
|
||||
|
|
@ -277,7 +277,7 @@ func TestStorageRemoveAllButCurrent(t *testing.T) {
|
|||
g := NewWithT(t)
|
||||
dir := t.TempDir()
|
||||
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2)
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
|
||||
|
||||
artifact := sourcev1.Artifact{
|
||||
|
|
@ -338,7 +338,7 @@ func TestStorageRemoveAll(t *testing.T) {
|
|||
g := NewWithT(t)
|
||||
dir := t.TempDir()
|
||||
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2)
|
||||
s, err := NewStorage(dir, "hostname", time.Minute, 2, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
|
||||
|
||||
artifact := sourcev1.Artifact{
|
||||
|
|
@ -364,7 +364,7 @@ func TestStorageCopyFromPath(t *testing.T) {
|
|||
|
||||
dir := t.TempDir()
|
||||
|
||||
storage, err := NewStorage(dir, "hostname", time.Minute, 2)
|
||||
storage, err := NewStorage(dir, "hostname", time.Minute, 2, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error while bootstrapping storage: %v", err)
|
||||
}
|
||||
|
|
@ -542,7 +542,7 @@ func TestStorage_getGarbageFiles(t *testing.T) {
|
|||
g := NewWithT(t)
|
||||
dir := t.TempDir()
|
||||
|
||||
s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained)
|
||||
s, err := NewStorage(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
|
||||
|
||||
artifact := sourcev1.Artifact{
|
||||
|
|
@ -616,7 +616,7 @@ func TestStorage_GarbageCollect(t *testing.T) {
|
|||
g := NewWithT(t)
|
||||
dir := t.TempDir()
|
||||
|
||||
s, err := NewStorage(dir, "hostname", time.Second*2, 2)
|
||||
s, err := NewStorage(dir, "hostname", time.Second*2, 2, 0)
|
||||
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
|
||||
|
||||
artifact := sourcev1.Artifact{
|
||||
|
|
@ -658,3 +658,90 @@ func TestStorage_GarbageCollect(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorage_MaxSize(t *testing.T) {
|
||||
createFiles := func(files map[string][]byte) (dir string, err error) {
|
||||
dir = t.TempDir()
|
||||
for name, b := range files {
|
||||
absPath := filepath.Join(dir, name)
|
||||
if err = os.MkdirAll(filepath.Dir(absPath), 0o750); err != nil {
|
||||
return
|
||||
}
|
||||
f, err := os.Create(absPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not create file %q: %w", absPath, err)
|
||||
}
|
||||
if n, err := f.Write(b); err != nil {
|
||||
f.Close()
|
||||
return "", fmt.Errorf("could not write %d bytes to file %q: %w", n, f.Name(), err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
files map[string][]byte
|
||||
maxSize int64
|
||||
wantErrMatch string
|
||||
}{
|
||||
{
|
||||
name: "creates artifact without size limit",
|
||||
files: map[string][]byte{
|
||||
"test.txt": []byte(`contents`),
|
||||
"test.yaml": []byte(`a: b`),
|
||||
},
|
||||
maxSize: -1,
|
||||
wantErrMatch: "",
|
||||
},
|
||||
{
|
||||
name: "fails to create artifact due to size limit",
|
||||
files: map[string][]byte{
|
||||
"test.txt": []byte(`contents`),
|
||||
"test.yaml": []byte(`a: b`),
|
||||
},
|
||||
maxSize: 200,
|
||||
wantErrMatch: "exceeds the max limit",
|
||||
},
|
||||
{
|
||||
name: "creates artifact in the size limit range",
|
||||
files: map[string][]byte{
|
||||
"test.txt": []byte(`contents`),
|
||||
"test.yaml": []byte(`a: b`),
|
||||
},
|
||||
maxSize: 300,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
dir, err := createFiles(tt.files)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
artifact := sourcev1.Artifact{
|
||||
Path: filepath.Join(randStringRunes(10), randStringRunes(10), randStringRunes(10)+".tar.gz"),
|
||||
}
|
||||
|
||||
s, err := NewStorage(dir, "hostname", time.Second*2, 2, tt.maxSize)
|
||||
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
|
||||
|
||||
if err := s.MkdirAll(artifact); err != nil {
|
||||
t.Fatalf("artifact directory creation failed: %v", err)
|
||||
}
|
||||
|
||||
err = s.Archive(&artifact, dir, SourceIgnoreFilter(nil, nil))
|
||||
if tt.wantErrMatch == "" {
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
} else {
|
||||
g.Expect(err.Error()).To(ContainSubstring(tt.wantErrMatch))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -354,7 +354,7 @@ func initTestTLS() {
|
|||
}
|
||||
|
||||
func newTestStorage(s *testserver.HTTPServer) (*Storage, error) {
|
||||
storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords)
|
||||
storage, err := NewStorage(s.Root(), s.URL(), retentionTTL, retentionRecords, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
10
main.go
10
main.go
|
|
@ -54,6 +54,7 @@ import (
|
|||
)
|
||||
|
||||
const controllerName = "source-controller"
|
||||
const artifactMaxSizeDefault int64 = 50 << 20
|
||||
|
||||
var (
|
||||
scheme = runtime.NewScheme()
|
||||
|
|
@ -101,6 +102,7 @@ func main() {
|
|||
helmCachePurgeInterval string
|
||||
artifactRetentionTTL time.Duration
|
||||
artifactRetentionRecords int
|
||||
artifactMaxSize int64
|
||||
)
|
||||
|
||||
flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
|
||||
|
|
@ -139,6 +141,8 @@ func main() {
|
|||
"The duration of time that artifacts will be kept in storage before being garbage collected.")
|
||||
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
|
||||
"The maximum number of artifacts to be kept in storage after a garbage collection.")
|
||||
flag.Int64Var(&artifactMaxSize, "artifact-max-size", artifactMaxSizeDefault,
|
||||
"The max allowed size in bytes of an artifact contents produced from sources. The limit can be disabled by setting the value to zero or a negative value.")
|
||||
|
||||
clientOptions.BindFlags(flag.CommandLine)
|
||||
logOptions.BindFlags(flag.CommandLine)
|
||||
|
|
@ -202,7 +206,7 @@ func main() {
|
|||
if storageAdvAddr == "" {
|
||||
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
|
||||
}
|
||||
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
|
||||
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactMaxSize, setupLog)
|
||||
|
||||
if err = managed.InitManagedTransport(); err != nil {
|
||||
// Log the error, but don't exit so as to not block reconcilers that are healthy.
|
||||
|
|
@ -350,14 +354,14 @@ func startFileServer(path string, address string, l logr.Logger) {
|
|||
}
|
||||
}
|
||||
|
||||
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage {
|
||||
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, artifactMaxSize int64, l logr.Logger) *controllers.Storage {
|
||||
if path == "" {
|
||||
p, _ := os.Getwd()
|
||||
path = filepath.Join(p, "bin")
|
||||
os.MkdirAll(path, 0o700)
|
||||
}
|
||||
|
||||
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
|
||||
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactMaxSize)
|
||||
if err != nil {
|
||||
l.Error(err, "unable to initialise storage")
|
||||
os.Exit(1)
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ func startEnvServer(setupReconcilers func(manager.Manager)) *envtest.Environment
|
|||
panic(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpStoragePath)
|
||||
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2)
|
||||
storage, err = controllers.NewStorage(tmpStoragePath, "localhost:5050", time.Minute*1, 2, 0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue