Refactor controller to use `fluxcd/pkg/artifact`

Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
Stefan Prodan 2025-09-04 16:18:24 +03:00
parent c8358d063c
commit 87ca533b83
No known key found for this signature in database
GPG Key ID: 3299AEB0E4085BAF
21 changed files with 163 additions and 2089 deletions

13
go.mod
View File

@ -18,12 +18,13 @@ require (
github.com/Masterminds/semver/v3 v3.4.0
github.com/cyphar/filepath-securejoin v0.4.1
github.com/distribution/distribution/v3 v3.0.0
github.com/docker/cli v28.3.3+incompatible
github.com/docker/cli v28.4.0+incompatible
github.com/docker/go-units v0.5.0
github.com/elazarl/goproxy v1.7.2
github.com/fluxcd/cli-utils v0.36.0-flux.15
github.com/fluxcd/pkg/apis/event v0.19.0
github.com/fluxcd/pkg/apis/meta v1.21.0
github.com/fluxcd/pkg/artifact v0.2.0
github.com/fluxcd/pkg/auth v0.30.0
github.com/fluxcd/pkg/cache v0.11.0
github.com/fluxcd/pkg/git v0.36.0
@ -31,10 +32,9 @@ require (
github.com/fluxcd/pkg/gittestserver v0.20.0
github.com/fluxcd/pkg/helmtestserver v0.29.0
github.com/fluxcd/pkg/http/transport v0.7.0
github.com/fluxcd/pkg/lockedfile v0.7.0
github.com/fluxcd/pkg/masktoken v0.8.0
github.com/fluxcd/pkg/oci v0.54.0
github.com/fluxcd/pkg/runtime v0.82.0
github.com/fluxcd/pkg/oci v0.55.0
github.com/fluxcd/pkg/runtime v0.83.0
github.com/fluxcd/pkg/sourceignore v0.14.0
github.com/fluxcd/pkg/ssh v0.21.0
github.com/fluxcd/pkg/tar v0.14.0
@ -53,7 +53,6 @@ require (
github.com/notaryproject/notation-go v1.3.2
github.com/onsi/gomega v1.38.2
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b
github.com/opencontainers/image-spec v1.1.1
github.com/ory/dockertest/v3 v3.12.0
github.com/otiai10/copy v1.14.1
@ -62,7 +61,7 @@ require (
github.com/sigstore/cosign/v2 v2.5.2
github.com/sigstore/sigstore v1.9.5
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.7
github.com/spf13/pflag v1.0.10
golang.org/x/crypto v0.41.0
golang.org/x/oauth2 v0.30.0
golang.org/x/sync v0.16.0
@ -193,6 +192,7 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fluxcd/gitkit v0.6.0 // indirect
github.com/fluxcd/pkg/apis/acl v0.9.0 // indirect
github.com/fluxcd/pkg/lockedfile v0.7.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
@ -296,6 +296,7 @@ require (
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/oleiade/reflections v1.1.0 // indirect
github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a // indirect
github.com/opencontainers/runc v1.2.4 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/otiai10/mint v1.6.3 // indirect

22
go.sum
View File

@ -321,8 +321,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/docker/cli v28.3.3+incompatible h1:fp9ZHAr1WWPGdIWBM1b3zLtgCF+83gRdVMTJsUeiyAo=
github.com/docker/cli v28.3.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/cli v28.4.0+incompatible h1:RBcf3Kjw2pMtwui5V0DIMdyeab8glEw5QY0UUU4C9kY=
github.com/docker/cli v28.4.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk=
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI=
@ -378,6 +378,8 @@ github.com/fluxcd/pkg/apis/event v0.19.0 h1:ZJU2voontkzp5rNYA4JMOu40S4tRcrWi4Do5
github.com/fluxcd/pkg/apis/event v0.19.0/go.mod h1:deuIyUb6lh+Z1Ccvwwxhm1wNM3kpSo+vF1IgRnpaZfQ=
github.com/fluxcd/pkg/apis/meta v1.21.0 h1:R+bN02chcs0HUmyVDQhqe/FHmYLjipVDMLnyYfNX850=
github.com/fluxcd/pkg/apis/meta v1.21.0/go.mod h1:XUAEUgT4gkWDAEN79E141tmL+v4SV50tVZ/Ojpc/ueg=
github.com/fluxcd/pkg/artifact v0.2.0 h1:y4j+c2v1qzXEgtQSAQbqAvvvdaUckQ7NxaWWobhNgm4=
github.com/fluxcd/pkg/artifact v0.2.0/go.mod h1:+L19/j8WPJ/blBZ/BFE+NhX6dja9Na1kTJkvZgbblbY=
github.com/fluxcd/pkg/auth v0.30.0 h1:7JMnY1ClArvOsadt6hOxceu8Q2hLsYHFMt0DV3BQl4Q=
github.com/fluxcd/pkg/auth v0.30.0/go.mod h1:me38o1nDfSLw6YvnkT9Ce/zqJZICZSA7j5pNMR3JUbc=
github.com/fluxcd/pkg/cache v0.11.0 h1:fsE8S+una21fSNw4MDXGUIf0Gf1J+pqa4RbsVKf2aTI=
@ -396,10 +398,10 @@ github.com/fluxcd/pkg/lockedfile v0.7.0 h1:tmzW2GeMGuJMiCcVloXVd1vKZ92anm9WGkRgO
github.com/fluxcd/pkg/lockedfile v0.7.0/go.mod h1:AzCV/h1N3hi/KtUDUCUgS8hl1+a1y+I6pmRo25dxdK0=
github.com/fluxcd/pkg/masktoken v0.8.0 h1:Dm5xIVNbg0s6zNttjDvimaG38bKsXwxBVo5b+D7ThVU=
github.com/fluxcd/pkg/masktoken v0.8.0/go.mod h1:Gc73ALOqIe+5Gj2V3JggMNiYcBiZ9bNNDYBE9R5XTTg=
github.com/fluxcd/pkg/oci v0.54.0 h1:s9INS1xocek9Lijob/Pq8xGx+TUA1NInmImY1Cw1DQA=
github.com/fluxcd/pkg/oci v0.54.0/go.mod h1:Z0QAwiC3E8aG4ggFGub1lKhIS++rfcMmrrUt4VSEQ38=
github.com/fluxcd/pkg/runtime v0.82.0 h1:VdPPRJtj8/rcBdqY7GZSffoxe5elFHt+ymwQHNbPOlc=
github.com/fluxcd/pkg/runtime v0.82.0/go.mod h1:rIDynMhU5upbn8ce3bXQhH5L6vtDw5MELycvtJG/+og=
github.com/fluxcd/pkg/oci v0.55.0 h1:7/EpGRv/5KtWFu9/bXozxR4Nu3V76TNuuN/0lII51G8=
github.com/fluxcd/pkg/oci v0.55.0/go.mod h1:roi2GxtkGBcOYCXnPw1VJvxllgAZ/pqTCCSm9bZY9Bs=
github.com/fluxcd/pkg/runtime v0.83.0 h1:XzpwKzo7GqfBE/BKpxG5B4U7cUnojnB407S9Dpp6oLU=
github.com/fluxcd/pkg/runtime v0.83.0/go.mod h1:r8KLvXRguKtpLAa66fA19rIbwPViXm8az038IUabYvw=
github.com/fluxcd/pkg/sourceignore v0.14.0 h1:ZiZzbXtXb/Qp7I7JCStsxOlX8ri8rWwCvmvIrJ0UzQQ=
github.com/fluxcd/pkg/sourceignore v0.14.0/go.mod h1:E3zKvyTyB+oQKqm/2I/jS6Rrt3B7fNuig/4bY2vi3bg=
github.com/fluxcd/pkg/ssh v0.21.0 h1:ZmyF0n9je0cTTkOpvFVgIhmdx9qtswnVE60TK4IzJh0=
@ -809,8 +811,8 @@ github.com/open-policy-agent/opa v1.5.1 h1:LTxxBJusMVjfs67W4FoRcnMfXADIGFMzpqnfk
github.com/open-policy-agent/opa v1.5.1/go.mod h1:bYbS7u+uhTI+cxHQIpzvr5hxX0hV7urWtY+38ZtjMgk=
github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be h1:f2PlhC9pm5sqpBZFvnAoKj+KzXRzbjFMA+TqXfJdgho=
github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b h1:nAiL9bmUK4IzFrKoVMRykv0iYGdoit5vpbPaVCZ+fI4=
github.com/opencontainers/go-digest/blake3 v0.0.0-20250116041648-1e56c6daea3b/go.mod h1:kqQaIc6bZstKgnGpL7GD5dWoLKbA6mH1Y9ULjGImBnM=
github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a h1:IAncDmJeD90l6+YR1Gf6r0HrmnRmOatzPfUpMS80ZTI=
github.com/opencontainers/go-digest/blake3 v0.0.0-20250813155314-89707e38ad1a/go.mod h1:kqQaIc6bZstKgnGpL7GD5dWoLKbA6mH1Y9ULjGImBnM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
github.com/opencontainers/runc v1.2.4 h1:yWFgLkghp71D76Fa0l349yAl5g4Gse7DPYNlvkQ9Eiw=
@ -953,8 +955,8 @@ github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cA
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M=
github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4=
github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=

View File

@ -44,6 +44,8 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
intdigest "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/runtime/conditions"
@ -59,12 +61,10 @@ import (
"github.com/fluxcd/source-controller/internal/bucket/azure"
"github.com/fluxcd/source-controller/internal/bucket/gcp"
"github.com/fluxcd/source-controller/internal/bucket/minio"
intdigest "github.com/fluxcd/source-controller/internal/digest"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/index"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
// maxConcurrentBucketFetches is the upper bound on the goroutines used to

View File

@ -38,6 +38,8 @@ import (
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
intdigest "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/runtime/conditions"
conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check"
@ -45,13 +47,11 @@ import (
"github.com/fluxcd/pkg/runtime/patch"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/index"
gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs"
s3mock "github.com/fluxcd/source-controller/internal/mock/s3"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
// Environment variable to set the GCP Storage host for the GCP client.

View File

@ -49,6 +49,7 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/git/gogit"
@ -59,7 +60,6 @@ import (
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
rreconcile "github.com/fluxcd/pkg/runtime/reconcile"
"github.com/fluxcd/pkg/sourceignore"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
@ -67,7 +67,6 @@ import (
"github.com/fluxcd/source-controller/internal/features"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
"github.com/fluxcd/source-controller/internal/util"
)

View File

@ -59,12 +59,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
intstorage "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/testenv"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
intstorage "github.com/fluxcd/source-controller/internal/storage"
)
var (

View File

@ -48,6 +48,7 @@ import (
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/git/github"
@ -64,7 +65,6 @@ import (
"github.com/fluxcd/source-controller/internal/features"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
const (
@ -1503,6 +1503,8 @@ func TestGitRepositoryReconciler_reconcileInclude(t *testing.T) {
server, err := testserver.NewTempArtifactServer()
g.Expect(err).NotTo(HaveOccurred())
server.Start()
defer server.Stop()
storage, err := newTestStorage(server.HTTPServer)
g.Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(storage.BasePath)

View File

@ -55,6 +55,7 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
@ -75,7 +76,6 @@ import (
"github.com/fluxcd/source-controller/internal/oci/notation"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
"github.com/fluxcd/source-controller/internal/util"
)

View File

@ -34,6 +34,8 @@ import (
"testing"
"time"
"github.com/fluxcd/pkg/artifact/config"
"github.com/fluxcd/pkg/artifact/digest"
"github.com/notaryproject/notation-core-go/signature/cose"
"github.com/notaryproject/notation-core-go/testhelper"
"github.com/notaryproject/notation-go"
@ -61,6 +63,7 @@ import (
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/helmtestserver"
"github.com/fluxcd/pkg/runtime/conditions"
conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check"
@ -77,7 +80,6 @@ import (
snotation "github.com/fluxcd/source-controller/internal/oci/notation"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
func TestHelmChartReconciler_deleteBeforeFinalizer(t *testing.T) {
@ -571,14 +573,22 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) {
tmpDir := t.TempDir()
storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: tmpDir,
StorageAddress: "example.com",
StorageAdvAddress: "example.com",
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
g.Expect(err).ToNot(HaveOccurred())
gitArtifact := &meta.Artifact{
Revision: "mock-ref/abcdefg12345678",
Path: "mock.tgz",
}
g.Expect(storage.Archive(gitArtifact, "testdata/charts", nil)).To(Succeed())
g.Expect(st.Archive(gitArtifact, "testdata/charts", nil)).To(Succeed())
tests := []struct {
name string
@ -785,7 +795,7 @@ func TestHelmChartReconciler_reconcileSource(t *testing.T) {
r := &HelmChartReconciler{
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
Storage: storage,
Storage: st,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
}
@ -1115,14 +1125,14 @@ func TestHelmChartReconciler_buildFromHelmRepository(t *testing.T) {
clientBuilder.WithObjects(tt.secret.DeepCopy())
}
storage, err := newTestStorage(server)
testStorage, err := newTestStorage(server)
g.Expect(err).ToNot(HaveOccurred())
r := &HelmChartReconciler{
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
Getters: testGetters,
Storage: storage,
Storage: testStorage,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
}
@ -1188,14 +1198,22 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) {
metadata, err := loadTestChartToOCI(chartData, testRegistryServer, "", "", "")
g.Expect(err).NotTo(HaveOccurred())
storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: tmpDir,
StorageAddress: "example.com",
StorageAdvAddress: "example.com",
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
g.Expect(err).ToNot(HaveOccurred())
cachedArtifact := &meta.Artifact{
Revision: "0.1.0",
Path: metadata.Name + "-" + metadata.Version + ".tgz",
}
g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
tests := []struct {
name string
@ -1273,7 +1291,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) {
assertFunc: func(g *WithT, obj *sourcev1.HelmChart, build chart.Build) {
g.Expect(build.Name).To(Equal(metadata.Name))
g.Expect(build.Version).To(Equal(metadata.Version))
g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(BeARegularFile())
g.Expect(build.ValuesFiles).To(BeEmpty())
},
@ -1292,7 +1310,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) {
assertFunc: func(g *WithT, obj *sourcev1.HelmChart, build chart.Build) {
g.Expect(build.Name).To(Equal(metadata.Name))
g.Expect(build.Version).To(Equal(metadata.Version))
g.Expect(build.Path).ToNot(Equal(storage.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).ToNot(Equal(st.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(BeARegularFile())
},
cleanFunc: func(g *WithT, build *chart.Build) {
@ -1356,7 +1374,7 @@ func TestHelmChartReconciler_buildFromOCIHelmRepository(t *testing.T) {
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
Getters: testGetters,
Storage: storage,
Storage: st,
RegistryClientGenerator: registry.ClientGenerator,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
}
@ -1411,24 +1429,32 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
tmpDir := t.TempDir()
storage, err := storage.New(tmpDir, "example.com", retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: tmpDir,
StorageAddress: "example.com",
StorageAdvAddress: "example.com",
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
g.Expect(err).ToNot(HaveOccurred())
chartsArtifact := &meta.Artifact{
Revision: "mock-ref/abcdefg12345678",
Path: "mock.tgz",
}
g.Expect(storage.Archive(chartsArtifact, "testdata/charts", nil)).To(Succeed())
g.Expect(st.Archive(chartsArtifact, "testdata/charts", nil)).To(Succeed())
yamlArtifact := &meta.Artifact{
Revision: "9876abcd",
Path: "values.yaml",
}
g.Expect(storage.CopyFromPath(yamlArtifact, "testdata/charts/helmchart/values.yaml")).To(Succeed())
g.Expect(st.CopyFromPath(yamlArtifact, "testdata/charts/helmchart/values.yaml")).To(Succeed())
cachedArtifact := &meta.Artifact{
Revision: "0.1.0",
Path: "cached.tgz",
}
g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
tests := []struct {
name string
@ -1518,7 +1544,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
assertFunc: func(g *WithT, build chart.Build) {
g.Expect(build.Name).To(Equal("helmchart"))
g.Expect(build.Version).To(Equal("0.1.0"))
g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(BeARegularFile())
g.Expect(build.ValuesFiles).To(BeEmpty())
},
@ -1535,7 +1561,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
assertFunc: func(g *WithT, build chart.Build) {
g.Expect(build.Name).To(Equal("helmchart"))
g.Expect(build.Version).To(Equal("0.1.0"))
g.Expect(build.Path).To(Equal(storage.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(Equal(st.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(BeARegularFile())
g.Expect(build.ValuesFiles).To(Equal([]string{"values.yaml", "override.yaml"}))
},
@ -1553,7 +1579,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
assertFunc: func(g *WithT, build chart.Build) {
g.Expect(build.Name).To(Equal("helmchart"))
g.Expect(build.Version).To(Equal("0.1.0"))
g.Expect(build.Path).ToNot(Equal(storage.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).ToNot(Equal(st.LocalPath(*cachedArtifact.DeepCopy())))
g.Expect(build.Path).To(BeARegularFile())
g.Expect(build.ValuesFiles).To(BeEmpty())
},
@ -1590,7 +1616,7 @@ func TestHelmChartReconciler_buildFromTarballArtifact(t *testing.T) {
WithStatusSubresource(&sourcev1.HelmChart{}).
Build(),
EventRecorder: record.NewFakeRecorder(32),
Storage: storage,
Storage: st,
Getters: testGetters,
RegistryClientGenerator: registry.ClientGenerator,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
@ -2898,19 +2924,26 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t
metadata, err := loadTestChartToOCI(chartData, server, "", "", "")
g.Expect(err).NotTo(HaveOccurred())
storage, err := storage.New(tmpDir, server.registryHost, retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: tmpDir,
StorageAddress: server.registryHost,
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
g.Expect(err).ToNot(HaveOccurred())
cachedArtifact := &meta.Artifact{
Revision: "0.1.0",
Path: metadata.Name + "-" + metadata.Version + ".tgz",
}
g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
certTuple := testhelper.GetRSASelfSignedSigningCertTuple("notation self-signed certs for testing")
certs := []*x509.Certificate{certTuple.Cert}
signer, err := signer.New(certTuple.PrivateKey, certs)
sg, err := signer.New(certTuple.PrivateKey, certs)
g.Expect(err).ToNot(HaveOccurred())
policyDocument := trustpolicy.Document{
@ -3120,7 +3153,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
Getters: testGetters,
Storage: storage,
Storage: st,
RegistryClientGenerator: registry.ClientGenerator,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
}
@ -3162,7 +3195,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureNotation(t *t
ArtifactReference: artifact,
}
_, err = notation.Sign(ctx, signer, repo, signOptions)
_, err = notation.Sign(ctx, sg, repo, signOptions)
g.Expect(err).ToNot(HaveOccurred())
}
@ -3222,14 +3255,21 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes
metadata, err := loadTestChartToOCI(chartData, server, "", "", "")
g.Expect(err).NotTo(HaveOccurred())
storage, err := storage.New(tmpDir, server.registryHost, retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: tmpDir,
StorageAddress: server.registryHost,
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
g.Expect(err).ToNot(HaveOccurred())
cachedArtifact := &meta.Artifact{
Revision: "0.1.0",
Path: metadata.Name + "-" + metadata.Version + ".tgz",
}
g.Expect(storage.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
g.Expect(st.CopyFromPath(cachedArtifact, "testdata/charts/helmchart-0.1.0.tgz")).To(Succeed())
pf := func(b bool) ([]byte, error) {
return []byte("cosign-password"), nil
@ -3365,7 +3405,7 @@ func TestHelmChartReconciler_reconcileSourceFromOCI_verifySignatureCosign(t *tes
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
Getters: testGetters,
Storage: storage,
Storage: st,
RegistryClientGenerator: registry.ClientGenerator,
patchOptions: getPatchOptions(helmChartReadyCondition.Owned, "sc"),
}

View File

@ -42,6 +42,8 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
intdigest "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/jitter"
@ -51,14 +53,12 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/fluxcd/source-controller/internal/cache"
intdigest "github.com/fluxcd/source-controller/internal/digest"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/helm/repository"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
// helmRepositoryReadyCondition contains the information required to summarize a

View File

@ -43,6 +43,8 @@ import (
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
intdigest "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/helmtestserver"
"github.com/fluxcd/pkg/runtime/conditions"
conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check"
@ -51,12 +53,10 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/fluxcd/source-controller/internal/cache"
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/helm/repository"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
)
func TestHelmRepositoryReconciler_deleteBeforeFinalizer(t *testing.T) {

View File

@ -50,6 +50,7 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/oci"
@ -77,7 +78,6 @@ import (
"github.com/fluxcd/source-controller/internal/oci/notation"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/storage"
"github.com/fluxcd/source-controller/internal/util"
)

View File

@ -60,6 +60,8 @@ import (
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
intdigest "github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/oci"
@ -69,11 +71,9 @@ import (
"github.com/fluxcd/pkg/tar"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
intdigest "github.com/fluxcd/source-controller/internal/digest"
serror "github.com/fluxcd/source-controller/internal/error"
snotation "github.com/fluxcd/source-controller/internal/oci/notation"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/storage"
testproxy "github.com/fluxcd/source-controller/tests/proxy"
)

View File

@ -32,6 +32,10 @@ import (
"testing"
"time"
"github.com/distribution/distribution/v3/configuration"
dockerRegistry "github.com/distribution/distribution/v3/registry"
_ "github.com/distribution/distribution/v3/registry/auth/htpasswd"
_ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
"github.com/foxcpp/go-mockdns"
"github.com/phayes/freeport"
"github.com/sirupsen/logrus"
@ -45,11 +49,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
"github.com/distribution/distribution/v3/configuration"
dockerRegistry "github.com/distribution/distribution/v3/registry"
_ "github.com/distribution/distribution/v3/registry/auth/htpasswd"
_ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
"github.com/fluxcd/pkg/artifact/config"
"github.com/fluxcd/pkg/artifact/digest"
"github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/testenv"
@ -57,7 +59,6 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/storage"
// +kubebuilder:scaffold:imports
)
@ -432,7 +433,15 @@ func initTestTLS() {
}
func newTestStorage(s *testserver.HTTPServer) (*storage.Storage, error) {
st, err := storage.New(s.Root(), s.URL(), retentionTTL, retentionRecords)
opts := &config.Options{
StoragePath: s.Root(),
StorageAddress: s.URL(),
StorageAdvAddress: s.URL(),
ArtifactRetentionTTL: retentionTTL,
ArtifactRetentionRecords: retentionRecords,
ArtifactDigestAlgo: digest.Canonical.String(),
}
st, err := storage.New(opts)
if err != nil {
return nil, err
}

View File

@ -1,52 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digest
import (
"crypto"
_ "crypto/sha1"
_ "crypto/sha256"
_ "crypto/sha512"
"fmt"
"github.com/opencontainers/go-digest"
_ "github.com/opencontainers/go-digest/blake3"
)
const (
SHA1 digest.Algorithm = "sha1"
)
var (
// Canonical is the primary digest algorithm used to calculate checksums.
Canonical = digest.SHA256
)
func init() {
// Register SHA-1 algorithm for support of e.g. Git commit SHAs.
digest.RegisterAlgorithm(SHA1, crypto.SHA1)
}
// AlgorithmForName returns the digest algorithm for the given name, or an
// error of type digest.ErrDigestUnsupported if the algorithm is unavailable.
func AlgorithmForName(name string) (digest.Algorithm, error) {
a := digest.Algorithm(name)
if !a.Available() {
return "", fmt.Errorf("%w: %s", digest.ErrDigestUnsupported, name)
}
return a, nil
}

View File

@ -1,71 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digest
import (
"errors"
"testing"
. "github.com/onsi/gomega"
"github.com/opencontainers/go-digest"
)
func TestAlgorithmForName(t *testing.T) {
tests := []struct {
name string
want digest.Algorithm
wantErr error
}{
{
name: "sha256",
want: digest.SHA256,
},
{
name: "sha384",
want: digest.SHA384,
},
{
name: "sha512",
want: digest.SHA512,
},
{
name: "blake3",
want: digest.BLAKE3,
},
{
name: "sha1",
want: SHA1,
},
{
name: "not-available",
wantErr: digest.ErrDigestUnsupported,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
got, err := AlgorithmForName(tt.name)
if tt.wantErr != nil {
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, tt.wantErr)).To(BeTrue())
return
}
g.Expect(err).ToNot(HaveOccurred())
g.Expect(got).To(Equal(tt.want))
})
}
}

View File

@ -1,71 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digest
import (
"fmt"
"io"
"github.com/opencontainers/go-digest"
)
// MultiDigester is a digester that writes to multiple digesters to calculate
// the checksum of different algorithms.
type MultiDigester struct {
d map[digest.Algorithm]digest.Digester
}
// NewMultiDigester returns a new MultiDigester that writes to newly
// initialized digesters for the given algorithms. If a provided algorithm is
// not available, it returns a digest.ErrDigestUnsupported error.
func NewMultiDigester(algos ...digest.Algorithm) (*MultiDigester, error) {
d := make(map[digest.Algorithm]digest.Digester, len(algos))
for _, a := range algos {
if _, ok := d[a]; ok {
continue
}
if !a.Available() {
return nil, fmt.Errorf("%w: %s", digest.ErrDigestUnsupported, a)
}
d[a] = a.Digester()
}
return &MultiDigester{d: d}, nil
}
// Write writes p to all underlying digesters.
func (w *MultiDigester) Write(p []byte) (n int, err error) {
for _, d := range w.d {
n, err = d.Hash().Write(p)
if err != nil {
return
}
if n != len(p) {
err = io.ErrShortWrite
return
}
}
return len(p), nil
}
// Digest returns the digest of the data written to the digester of the given
// algorithm, or an empty digest if the algorithm is not available.
func (w *MultiDigester) Digest(algo digest.Algorithm) digest.Digest {
if d, ok := w.d[algo]; ok {
return d.Digest()
}
return ""
}

View File

@ -1,128 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package digest
import (
"crypto/rand"
"testing"
. "github.com/onsi/gomega"
"github.com/opencontainers/go-digest"
)
func TestNewMultiDigester(t *testing.T) {
t.Run("constructs a MultiDigester", func(t *testing.T) {
g := NewWithT(t)
d, err := NewMultiDigester(Canonical, digest.SHA512)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(d.d).To(HaveLen(2))
})
t.Run("returns an error if an algorithm is not available", func(t *testing.T) {
g := NewWithT(t)
_, err := NewMultiDigester(digest.Algorithm("not-available"))
g.Expect(err).To(HaveOccurred())
})
}
func TestMultiDigester_Write(t *testing.T) {
t.Run("writes to all digesters", func(t *testing.T) {
g := NewWithT(t)
d, err := NewMultiDigester(Canonical, digest.SHA512)
g.Expect(err).ToNot(HaveOccurred())
n, err := d.Write([]byte("hello"))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(n).To(Equal(5))
n, err = d.Write([]byte(" world"))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(n).To(Equal(6))
g.Expect(d.Digest(Canonical)).To(BeEquivalentTo("sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"))
g.Expect(d.Digest(digest.SHA512)).To(BeEquivalentTo("sha512:309ecc489c12d6eb4cc40f50c902f2b4d0ed77ee511a7c7a9bcd3ca86d4cd86f989dd35bc5ff499670da34255b45b0cfd830e81f605dcf7dc5542e93ae9cd76f"))
})
}
func TestMultiDigester_Digest(t *testing.T) {
t.Run("returns the digest for the given algorithm", func(t *testing.T) {
g := NewWithT(t)
d, err := NewMultiDigester(Canonical, digest.SHA512)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(d.Digest(Canonical)).To(BeEquivalentTo("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"))
g.Expect(d.Digest(digest.SHA512)).To(BeEquivalentTo("sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"))
})
t.Run("returns an empty digest if the algorithm is not supported", func(t *testing.T) {
g := NewWithT(t)
d, err := NewMultiDigester(Canonical, digest.SHA512)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(d.Digest(digest.Algorithm("not-available"))).To(BeEmpty())
})
}
func benchmarkMultiDigesterWrite(b *testing.B, algos []digest.Algorithm, pSize int64) {
md, err := NewMultiDigester(algos...)
if err != nil {
b.Fatal(err)
}
p := make([]byte, pSize)
if _, err = rand.Read(p); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
md.Write(p)
}
}
func BenchmarkMultiDigester_Write(b *testing.B) {
const pSize = 1024 * 2
b.Run("sha1", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{SHA1}, pSize)
})
b.Run("sha256", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256}, pSize)
})
b.Run("blake3", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.BLAKE3}, pSize)
})
b.Run("sha256+sha384", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.SHA384}, pSize)
})
b.Run("sha256+sha512", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.SHA512}, pSize)
})
b.Run("sha256+blake3", func(b *testing.B) {
benchmarkMultiDigesterWrite(b, []digest.Algorithm{digest.SHA256, digest.BLAKE3}, pSize)
})
}

View File

@ -1,733 +0,0 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
"io/fs"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/fluxcd/pkg/apis/meta"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/opencontainers/go-digest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"github.com/fluxcd/pkg/lockedfile"
"github.com/fluxcd/pkg/oci"
"github.com/fluxcd/pkg/sourceignore"
pkgtar "github.com/fluxcd/pkg/tar"
intdigest "github.com/fluxcd/source-controller/internal/digest"
)
const GarbageCountLimit = 1000
const (
// defaultFileMode is the permission mode applied to files inside an artifact archive.
defaultFileMode int64 = 0o600
// defaultDirMode is the permission mode applied to all directories inside an artifact archive.
defaultDirMode int64 = 0o750
// defaultExeFileMode is the permission mode applied to executable files inside an artifact archive.
defaultExeFileMode int64 = 0o700
)
// Storage manages artifacts
type Storage struct {
// BasePath is the local directory path where the source artifacts are stored.
BasePath string `json:"basePath"`
// Hostname is the file server host name used to compose the artifacts URIs.
Hostname string `json:"hostname"`
// ArtifactRetentionTTL is the duration of time that artifacts will be kept
// in storage before being garbage collected.
ArtifactRetentionTTL time.Duration `json:"artifactRetentionTTL"`
// ArtifactRetentionRecords is the maximum number of artifacts to be kept in
// storage after a garbage collection.
ArtifactRetentionRecords int `json:"artifactRetentionRecords"`
}
// New creates the storage helper for a given path and hostname.
func New(basePath string, hostname string, artifactRetentionTTL time.Duration, artifactRetentionRecords int) (*Storage, error) {
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
return nil, fmt.Errorf("invalid dir path: %s", basePath)
}
return &Storage{
BasePath: basePath,
Hostname: hostname,
ArtifactRetentionTTL: artifactRetentionTTL,
ArtifactRetentionRecords: artifactRetentionRecords,
}, nil
}
// NewArtifactFor returns a new meta.Artifact.
func (s Storage) NewArtifactFor(kind string, metadata metav1.Object, revision, fileName string) meta.Artifact {
path := ArtifactPath(kind, metadata.GetNamespace(), metadata.GetName(), fileName)
artifact := meta.Artifact{
Path: path,
Revision: revision,
}
s.SetArtifactURL(&artifact)
return artifact
}
// SetArtifactURL sets the URL on the given meta.Artifact.
func (s Storage) SetArtifactURL(artifact *meta.Artifact) {
if artifact.Path == "" {
return
}
format := "http://%s/%s"
if strings.HasPrefix(s.Hostname, "http://") || strings.HasPrefix(s.Hostname, "https://") {
format = "%s/%s"
}
artifact.URL = fmt.Sprintf(format, s.Hostname, strings.TrimLeft(artifact.Path, "/"))
}
// SetHostname sets the hostname of the given URL string to the current Storage.Hostname and returns the result.
func (s Storage) SetHostname(URL string) string {
u, err := url.Parse(URL)
if err != nil {
return ""
}
u.Host = s.Hostname
return u.String()
}
// MkdirAll calls os.MkdirAll for the given meta.Artifact base dir.
func (s Storage) MkdirAll(artifact meta.Artifact) error {
dir := filepath.Dir(s.LocalPath(artifact))
return os.MkdirAll(dir, 0o700)
}
// Remove calls os.Remove for the given meta.Artifact path.
func (s Storage) Remove(artifact meta.Artifact) error {
return os.Remove(s.LocalPath(artifact))
}
// RemoveAll calls os.RemoveAll for the given meta.Artifact base dir.
func (s Storage) RemoveAll(artifact meta.Artifact) (string, error) {
var deletedDir string
dir := filepath.Dir(s.LocalPath(artifact))
// Check if the dir exists.
_, err := os.Stat(dir)
if err == nil {
deletedDir = dir
}
return deletedDir, os.RemoveAll(dir)
}
// RemoveAllButCurrent removes all files for the given meta.Artifact base dir, excluding the current one.
func (s Storage) RemoveAllButCurrent(artifact meta.Artifact) ([]string, error) {
deletedFiles := []string{}
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
var errors []string
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
if path != localPath && !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink {
if err := os.Remove(path); err != nil {
errors = append(errors, info.Name())
} else {
// Collect the successfully deleted file paths.
deletedFiles = append(deletedFiles, path)
}
}
return nil
})
if len(errors) > 0 {
return deletedFiles, fmt.Errorf("failed to remove files: %s", strings.Join(errors, " "))
}
return deletedFiles, nil
}
// getGarbageFiles returns all files that need to be garbage collected for the given artifact.
// Garbage files are determined based on the below flow:
// 1. collect all artifact files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, collect all artifact files till the latest n files remain, where n=maxItemsToBeRetained
func (s Storage) getGarbageFiles(artifact meta.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) (garbageFiles []string, _ error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
artifactFilesWithCreatedTs := make(map[time.Time]string)
// sortedPaths contain all files sorted according to their created ts.
sortedPaths := []string{}
now := time.Now().UTC()
totalArtifactFiles := 0
var errors []string
creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
if totalArtifactFiles >= totalCountLimit {
return fmt.Errorf("reached file walking limit, already walked over: %d", totalArtifactFiles)
}
info, err := d.Info()
if err != nil {
errors = append(errors, err.Error())
return nil
}
createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt)
// Compare the time difference between now and the time at which the file was created
// with the provided TTL. Delete if the difference is greater than the TTL. Since the
// below logic just deals with determining if an artifact needs to be garbage collected,
// we avoid all lock files, adding them at the end to the list of garbage files.
expired := diff > ttl
if !info.IsDir() && info.Mode()&os.ModeSymlink != os.ModeSymlink && filepath.Ext(path) != ".lock" {
if path != localPath && expired {
garbageFiles = append(garbageFiles, path)
}
totalArtifactFiles += 1
artifactFilesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt)
}
return nil
})
if len(errors) > 0 {
return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ","))
}
// We already collected enough garbage files to satisfy the no. of max
// items that are supposed to be retained, so exit early.
if totalArtifactFiles-len(garbageFiles) < maxItemsToBeRetained {
return garbageFiles, nil
}
// sort all timestamps in ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps {
path, ok := artifactFilesWithCreatedTs[ts]
if !ok {
return garbageFiles, fmt.Errorf("failed to fetch file for created ts: %v", ts)
}
sortedPaths = append(sortedPaths, path)
}
var collected int
noOfGarbageFiles := len(garbageFiles)
for _, path := range sortedPaths {
if path != localPath && filepath.Ext(path) != ".lock" && !stringInSlice(path, garbageFiles) {
// If we previously collected some garbage files with an expired ttl, then take that into account
// when checking whether we need to remove more files to satisfy the max no. of items allowed
// in the filesystem, along with the no. of files already removed in this loop.
if noOfGarbageFiles > 0 {
if (len(sortedPaths) - collected - len(garbageFiles)) > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
} else {
if len(sortedPaths)-collected > maxItemsToBeRetained {
garbageFiles = append(garbageFiles, path)
collected += 1
}
}
}
}
return garbageFiles, nil
}
// GarbageCollect removes all garbage files in the artifact dir according to the provided
// retention options.
func (s Storage) GarbageCollect(ctx context.Context, artifact meta.Artifact, timeout time.Duration) ([]string, error) {
delFilesChan := make(chan []string)
errChan := make(chan error)
// Abort if it takes more than the provided timeout duration.
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
go func() {
garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, s.ArtifactRetentionRecords, s.ArtifactRetentionTTL)
if err != nil {
errChan <- err
return
}
var errors []error
var deleted []string
if len(garbageFiles) > 0 {
for _, file := range garbageFiles {
err := os.Remove(file)
if err != nil {
errors = append(errors, err)
} else {
deleted = append(deleted, file)
}
// If a lock file exists for this garbage artifact, remove that too.
lockFile := file + ".lock"
if _, err = os.Lstat(lockFile); err == nil {
err = os.Remove(lockFile)
if err != nil {
errors = append(errors, err)
}
}
}
}
if len(errors) > 0 {
errChan <- kerrors.NewAggregate(errors)
return
}
delFilesChan <- deleted
}()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case delFiles := <-delFilesChan:
return delFiles, nil
case err := <-errChan:
return nil, err
}
}
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// ArtifactExist returns a boolean indicating whether the meta.Artifact exists in storage and is a regular file.
func (s Storage) ArtifactExist(artifact meta.Artifact) bool {
fi, err := os.Lstat(s.LocalPath(artifact))
if err != nil {
return false
}
return fi.Mode().IsRegular()
}
// VerifyArtifact verifies if the Digest of the meta.Artifact matches the digest
// of the file in Storage. It returns an error if the digests don't match, or
// if it can't be verified.
func (s Storage) VerifyArtifact(artifact meta.Artifact) error {
if artifact.Digest == "" {
return fmt.Errorf("artifact has no digest")
}
d, err := digest.Parse(artifact.Digest)
if err != nil {
return fmt.Errorf("failed to parse artifact digest '%s': %w", artifact.Digest, err)
}
f, err := os.Open(s.LocalPath(artifact))
if err != nil {
return err
}
defer f.Close()
verifier := d.Verifier()
if _, err = io.Copy(verifier, f); err != nil {
return err
}
if !verifier.Verified() {
return fmt.Errorf("computed digest doesn't match '%s'", d.String())
}
return nil
}
// ArchiveFileFilter must return true if a file should not be included in the archive after inspecting the given path
// and/or os.FileInfo.
type ArchiveFileFilter func(p string, fi os.FileInfo) bool
// SourceIgnoreFilter returns an ArchiveFileFilter that filters out files matching sourceignore.VCSPatterns and any of
// the provided patterns.
// If an empty gitignore.Pattern slice is given, the matcher is set to sourceignore.NewDefaultMatcher.
func SourceIgnoreFilter(ps []gitignore.Pattern, domain []string) ArchiveFileFilter {
matcher := sourceignore.NewDefaultMatcher(ps, domain)
if len(ps) > 0 {
ps = append(sourceignore.VCSPatterns(domain), ps...)
matcher = sourceignore.NewMatcher(ps)
}
return func(p string, fi os.FileInfo) bool {
return matcher.Match(strings.Split(p, string(filepath.Separator)), fi.IsDir())
}
}
// Archive atomically archives the given directory as a tarball to the given meta.Artifact path, excluding
// directories and any ArchiveFileFilter matches. While archiving, any environment specific data (for example,
// the user and group name) is stripped from file headers.
// If successful, it sets the digest and last update time on the artifact.
func (s Storage) Archive(artifact *meta.Artifact, dir string, filter ArchiveFileFilter) (err error) {
if f, err := os.Stat(dir); os.IsNotExist(err) || !f.IsDir() {
return fmt.Errorf("invalid dir path: %s", dir)
}
localPath := s.LocalPath(*artifact)
tf, err := os.CreateTemp(filepath.Split(localPath))
if err != nil {
return err
}
tmpName := tf.Name()
defer func() {
if err != nil {
os.Remove(tmpName)
}
}()
d := intdigest.Canonical.Digester()
sz := &writeCounter{}
mw := io.MultiWriter(d.Hash(), tf, sz)
gw := gzip.NewWriter(mw)
tw := tar.NewWriter(gw)
if err := filepath.Walk(dir, func(p string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Ignore anything that is not a file or directories e.g. symlinks
if m := fi.Mode(); !(m.IsRegular() || m.IsDir()) {
return nil
}
// Skip filtered files
if filter != nil && filter(p, fi) {
return nil
}
header, err := tar.FileInfoHeader(fi, p)
if err != nil {
return err
}
// The name needs to be modified to maintain directory structure
// as tar.FileInfoHeader only has access to the base name of the file.
// Ref: https://golang.org/src/archive/tar/common.go?#L626
relFilePath := p
if filepath.IsAbs(dir) {
relFilePath, err = filepath.Rel(dir, p)
if err != nil {
return err
}
}
sanitizeHeader(relFilePath, header)
if err := tw.WriteHeader(header); err != nil {
return err
}
if !fi.Mode().IsRegular() {
return nil
}
f, err := os.Open(p)
if err != nil {
f.Close()
return err
}
if _, err := io.Copy(tw, f); err != nil {
f.Close()
return err
}
return f.Close()
}); err != nil {
tw.Close()
gw.Close()
tf.Close()
return err
}
if err := tw.Close(); err != nil {
gw.Close()
tf.Close()
return err
}
if err := gw.Close(); err != nil {
tf.Close()
return err
}
if err := tf.Close(); err != nil {
return err
}
if err := os.Chmod(tmpName, 0o600); err != nil {
return err
}
if err := oci.RenameWithFallback(tmpName, localPath); err != nil {
return err
}
artifact.Digest = d.Digest().String()
artifact.LastUpdateTime = metav1.Now()
artifact.Size = &sz.written
return nil
}
// AtomicWriteFile atomically writes the io.Reader contents to the meta.Artifact path.
// If successful, it sets the digest and last update time on the artifact.
func (s Storage) AtomicWriteFile(artifact *meta.Artifact, reader io.Reader, mode os.FileMode) (err error) {
localPath := s.LocalPath(*artifact)
tf, err := os.CreateTemp(filepath.Split(localPath))
if err != nil {
return err
}
tfName := tf.Name()
defer func() {
if err != nil {
os.Remove(tfName)
}
}()
d := intdigest.Canonical.Digester()
sz := &writeCounter{}
mw := io.MultiWriter(tf, d.Hash(), sz)
if _, err := io.Copy(mw, reader); err != nil {
tf.Close()
return err
}
if err := tf.Close(); err != nil {
return err
}
if err := os.Chmod(tfName, mode); err != nil {
return err
}
if err := oci.RenameWithFallback(tfName, localPath); err != nil {
return err
}
artifact.Digest = d.Digest().String()
artifact.LastUpdateTime = metav1.Now()
artifact.Size = &sz.written
return nil
}
// Copy atomically copies the io.Reader contents to the meta.Artifact path.
// If successful, it sets the digest and last update time on the artifact.
func (s Storage) Copy(artifact *meta.Artifact, reader io.Reader) (err error) {
localPath := s.LocalPath(*artifact)
tf, err := os.CreateTemp(filepath.Split(localPath))
if err != nil {
return err
}
tfName := tf.Name()
defer func() {
if err != nil {
os.Remove(tfName)
}
}()
d := intdigest.Canonical.Digester()
sz := &writeCounter{}
mw := io.MultiWriter(tf, d.Hash(), sz)
if _, err := io.Copy(mw, reader); err != nil {
tf.Close()
return err
}
if err := tf.Close(); err != nil {
return err
}
if err := oci.RenameWithFallback(tfName, localPath); err != nil {
return err
}
artifact.Digest = d.Digest().String()
artifact.LastUpdateTime = metav1.Now()
artifact.Size = &sz.written
return nil
}
// CopyFromPath atomically copies the contents of the given path to the path of the meta.Artifact.
// If successful, the digest and last update time on the artifact is set.
func (s Storage) CopyFromPath(artifact *meta.Artifact, path string) (err error) {
f, err := os.Open(path)
if err != nil {
return err
}
defer func() {
if cerr := f.Close(); cerr != nil && err == nil {
err = cerr
}
}()
err = s.Copy(artifact, f)
return err
}
// CopyToPath copies the contents in the (sub)path of the given artifact to the given path.
func (s Storage) CopyToPath(artifact *meta.Artifact, subPath, toPath string) error {
// create a tmp directory to store artifact
tmp, err := os.MkdirTemp("", "flux-include-")
if err != nil {
return err
}
defer os.RemoveAll(tmp)
// read artifact file content
localPath := s.LocalPath(*artifact)
f, err := os.Open(localPath)
if err != nil {
return err
}
defer f.Close()
// untar the artifact
untarPath := filepath.Join(tmp, "unpack")
if err = pkgtar.Untar(f, untarPath, pkgtar.WithMaxUntarSize(-1)); err != nil {
return err
}
// create the destination parent dir
if err = os.MkdirAll(filepath.Dir(toPath), os.ModePerm); err != nil {
return err
}
// copy the artifact content to the destination dir
fromPath, err := securejoin.SecureJoin(untarPath, subPath)
if err != nil {
return err
}
if err := oci.RenameWithFallback(fromPath, toPath); err != nil {
return err
}
return nil
}
// Symlink creates or updates a symbolic link for the given meta.Artifact and returns the URL for the symlink.
func (s Storage) Symlink(artifact meta.Artifact, linkName string) (string, error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
link := filepath.Join(dir, linkName)
tmpLink := link + ".tmp"
if err := os.Remove(tmpLink); err != nil && !os.IsNotExist(err) {
return "", err
}
if err := os.Symlink(localPath, tmpLink); err != nil {
return "", err
}
if err := os.Rename(tmpLink, link); err != nil {
return "", err
}
return fmt.Sprintf("http://%s/%s", s.Hostname, filepath.Join(filepath.Dir(artifact.Path), linkName)), nil
}
// Lock creates a file lock for the given meta.Artifact.
func (s Storage) Lock(artifact meta.Artifact) (unlock func(), err error) {
lockFile := s.LocalPath(artifact) + ".lock"
mutex := lockedfile.MutexAt(lockFile)
return mutex.Lock()
}
// LocalPath returns the secure local path of the given artifact (that is: relative to the Storage.BasePath).
func (s Storage) LocalPath(artifact meta.Artifact) string {
if artifact.Path == "" {
return ""
}
path, err := securejoin.SecureJoin(s.BasePath, artifact.Path)
if err != nil {
return ""
}
return path
}
// writeCounter is an implementation of io.Writer that only records the number
// of bytes written.
type writeCounter struct {
written int64
}
func (wc *writeCounter) Write(p []byte) (int, error) {
n := len(p)
wc.written += int64(n)
return n, nil
}
// sanitizeHeader modifies the tar.Header to be relative to the root of the
// archive and removes any environment specific data.
func sanitizeHeader(relP string, h *tar.Header) {
// Modify the name to be relative to the root of the archive,
// this ensures we maintain the same structure when extracting.
h.Name = relP
// We want to remove any environment specific data as well, this
// ensures the checksum is purely content based.
h.Gid = 0
h.Uid = 0
h.Uname = ""
h.Gname = ""
h.ModTime = time.Time{}
h.AccessTime = time.Time{}
h.ChangeTime = time.Time{}
// Override the mode to be the default for the type of file.
setDefaultMode(h)
}
// setDefaultMode sets the default mode for the given header.
func setDefaultMode(h *tar.Header) {
if h.FileInfo().IsDir() {
h.Mode = defaultDirMode
return
}
if h.FileInfo().Mode().IsRegular() {
mode := h.FileInfo().Mode()
if mode&os.ModeType == 0 && mode&0o111 != 0 {
h.Mode = defaultExeFileMode
return
}
h.Mode = defaultFileMode
return
}
}
// ArtifactDir returns the artifact dir path in the form of
// '<kind>/<namespace>/<name>'.
func ArtifactDir(kind, namespace, name string) string {
kind = strings.ToLower(kind)
return path.Join(kind, namespace, name)
}
// ArtifactPath returns the artifact path in the form of
// '<kind>/<namespace>/name>/<filename>'.
func ArtifactPath(kind, namespace, name, filename string) string {
return path.Join(ArtifactDir(kind, namespace, name), filename)
}

View File

@ -1,864 +0,0 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
. "github.com/onsi/gomega"
"github.com/fluxcd/pkg/apis/meta"
)
func TestStorageConstructor(t *testing.T) {
dir := t.TempDir()
if _, err := New("/nonexistent", "hostname", time.Minute, 2); err == nil {
t.Fatal("nonexistent path was allowable in storage constructor")
}
f, err := os.CreateTemp(dir, "")
if err != nil {
t.Fatalf("while creating temporary file: %v", err)
}
f.Close()
if _, err := New(f.Name(), "hostname", time.Minute, 2); err == nil {
os.Remove(f.Name())
t.Fatal("file path was accepted as basedir")
}
os.Remove(f.Name())
if _, err := New(dir, "hostname", time.Minute, 2); err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
}
// walks a tar.gz and looks for paths with the basename. It does not match
// symlinks properly at this time because that's painful.
func walkTar(tarFile string, match string, dir bool) (int64, int64, bool, error) {
f, err := os.Open(tarFile)
if err != nil {
return 0, 0, false, fmt.Errorf("could not open file: %w", err)
}
defer f.Close()
gzr, err := gzip.NewReader(f)
if err != nil {
return 0, 0, false, fmt.Errorf("could not unzip file: %w", err)
}
defer gzr.Close()
tr := tar.NewReader(gzr)
for {
header, err := tr.Next()
if err == io.EOF {
break
} else if err != nil {
return 0, 0, false, fmt.Errorf("corrupt tarball reading header: %w", err)
}
switch header.Typeflag {
case tar.TypeDir:
if header.Name == match && dir {
return 0, header.Mode, true, nil
}
case tar.TypeReg:
if header.Name == match {
return header.Size, header.Mode, true, nil
}
default:
// skip
}
}
return 0, 0, false, nil
}
func TestStorage_Archive(t *testing.T) {
dir := t.TempDir()
storage, err := New(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
type dummyFile struct {
content []byte
mode int64
}
createFiles := func(files map[string]dummyFile) (dir string, err error) {
dir = t.TempDir()
for name, df := range files {
absPath := filepath.Join(dir, name)
if err = os.MkdirAll(filepath.Dir(absPath), 0o750); err != nil {
return
}
f, err := os.Create(absPath)
if err != nil {
return "", fmt.Errorf("could not create file %q: %w", absPath, err)
}
if n, err := f.Write(df.content); err != nil {
f.Close()
return "", fmt.Errorf("could not write %d bytes to file %q: %w", n, f.Name(), err)
}
f.Close()
if df.mode != 0 {
if err = os.Chmod(absPath, os.FileMode(df.mode)); err != nil {
return "", fmt.Errorf("could not chmod file %q: %w", absPath, err)
}
}
}
return
}
matchFiles := func(t *testing.T, storage *Storage, artifact meta.Artifact, files map[string]dummyFile, dirs []string) {
t.Helper()
for name, df := range files {
mustExist := !(name[0:1] == "!")
if !mustExist {
name = name[1:]
}
s, m, exist, err := walkTar(storage.LocalPath(artifact), name, false)
if err != nil {
t.Fatalf("failed reading tarball: %v", err)
}
if bs := int64(len(df.content)); s != bs {
t.Fatalf("%q size %v != %v", name, s, bs)
}
if exist != mustExist {
if mustExist {
t.Errorf("could not find file %q in tarball", name)
} else {
t.Errorf("tarball contained excluded file %q", name)
}
}
expectMode := df.mode
if expectMode == 0 {
expectMode = defaultFileMode
}
if exist && m != expectMode {
t.Fatalf("%q mode %v != %v", name, m, expectMode)
}
}
for _, name := range dirs {
mustExist := !(name[0:1] == "!")
if !mustExist {
name = name[1:]
}
_, m, exist, err := walkTar(storage.LocalPath(artifact), name, true)
if err != nil {
t.Fatalf("failed reading tarball: %v", err)
}
if exist != mustExist {
if mustExist {
t.Errorf("could not find dir %q in tarball", name)
} else {
t.Errorf("tarball contained excluded file %q", name)
}
}
if exist && m != defaultDirMode {
t.Fatalf("%q mode %v != %v", name, m, defaultDirMode)
}
}
}
tests := []struct {
name string
files map[string]dummyFile
filter ArchiveFileFilter
want map[string]dummyFile
wantDirs []string
wantErr bool
}{
{
name: "no filter",
files: map[string]dummyFile{
".git/config": {},
"file.jpg": {content: []byte(`contents`)},
"manifest.yaml": {},
},
filter: nil,
want: map[string]dummyFile{
".git/config": {},
"file.jpg": {content: []byte(`contents`)},
"manifest.yaml": {},
},
},
{
name: "exclude VCS",
files: map[string]dummyFile{
".git/config": {},
"manifest.yaml": {},
},
wantDirs: []string{
"!.git",
},
filter: SourceIgnoreFilter(nil, nil),
want: map[string]dummyFile{
"!.git/config": {},
"manifest.yaml": {},
},
},
{
name: "custom",
files: map[string]dummyFile{
".git/config": {},
"custom": {},
"horse.jpg": {},
},
filter: SourceIgnoreFilter([]gitignore.Pattern{
gitignore.ParsePattern("custom", nil),
}, nil),
want: map[string]dummyFile{
"!git/config": {},
"!custom": {},
"horse.jpg": {},
},
wantErr: false,
},
{
name: "including directories",
files: map[string]dummyFile{
"test/.gitkeep": {},
},
filter: SourceIgnoreFilter([]gitignore.Pattern{
gitignore.ParsePattern("custom", nil),
}, nil),
wantDirs: []string{
"test",
},
wantErr: false,
},
{
name: "sets default file modes",
files: map[string]dummyFile{
"test/file": {
mode: 0o666,
},
"test/executable": {
mode: 0o777,
},
},
want: map[string]dummyFile{
"test/file": {
mode: defaultFileMode,
},
"test/executable": {
mode: defaultExeFileMode,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dir, err := createFiles(tt.files)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dir)
artifact := meta.Artifact{
Path: filepath.Join(randStringRunes(10), randStringRunes(10), randStringRunes(10)+".tar.gz"),
}
if err := storage.MkdirAll(artifact); err != nil {
t.Fatalf("artifact directory creation failed: %v", err)
}
if err := storage.Archive(&artifact, dir, tt.filter); (err != nil) != tt.wantErr {
t.Errorf("Archive() error = %v, wantErr %v", err, tt.wantErr)
}
matchFiles(t, storage, artifact, tt.want, tt.wantDirs)
})
}
}
func TestStorage_Remove(t *testing.T) {
t.Run("removes file", func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "", 0, 0)
g.Expect(err).ToNot(HaveOccurred())
artifact := meta.Artifact{
Path: filepath.Join(dir, "test.txt"),
}
g.Expect(s.MkdirAll(artifact)).To(Succeed())
g.Expect(s.AtomicWriteFile(&artifact, bytes.NewReader([]byte("test")), 0o600)).To(Succeed())
g.Expect(s.ArtifactExist(artifact)).To(BeTrue())
g.Expect(s.Remove(artifact)).To(Succeed())
g.Expect(s.ArtifactExist(artifact)).To(BeFalse())
})
t.Run("error if file does not exist", func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "", 0, 0)
g.Expect(err).ToNot(HaveOccurred())
artifact := meta.Artifact{
Path: filepath.Join(dir, "test.txt"),
}
err = s.Remove(artifact)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, os.ErrNotExist)).To(BeTrue())
})
}
func TestStorageRemoveAllButCurrent(t *testing.T) {
t.Run("bad directory in archive", func(t *testing.T) {
dir := t.TempDir()
s, err := New(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("Valid path did not successfully return: %v", err)
}
if _, err := s.RemoveAllButCurrent(meta.Artifact{Path: filepath.Join(dir, "really", "nonexistent")}); err == nil {
t.Fatal("Did not error while pruning non-existent path")
}
})
t.Run("collect names of deleted items", func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := meta.Artifact{
Path: filepath.Join("foo", "bar", "artifact1.tar.gz"),
}
// Create artifact dir and artifacts.
artifactDir := filepath.Join(dir, "foo", "bar")
g.Expect(os.MkdirAll(artifactDir, 0o750)).NotTo(HaveOccurred())
current := []string{
filepath.Join(artifactDir, "artifact1.tar.gz"),
}
wantDeleted := []string{
filepath.Join(artifactDir, "file1.txt"),
filepath.Join(artifactDir, "file2.txt"),
}
createFile := func(files []string) {
for _, c := range files {
f, err := os.Create(c)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
}
}
createFile(current)
createFile(wantDeleted)
_, err = s.Symlink(artifact, "latest.tar.gz")
g.Expect(err).ToNot(HaveOccurred(), "failed to create symlink")
deleted, err := s.RemoveAllButCurrent(artifact)
g.Expect(err).ToNot(HaveOccurred(), "failed to remove all but current")
g.Expect(deleted).To(Equal(wantDeleted))
})
}
func TestStorageRemoveAll(t *testing.T) {
tests := []struct {
name string
artifactPath string
createArtifactPath bool
wantDeleted string
}{
{
name: "delete non-existent path",
artifactPath: filepath.Join("foo", "bar", "artifact1.tar.gz"),
createArtifactPath: false,
wantDeleted: "",
},
{
name: "delete existing path",
artifactPath: filepath.Join("foo", "bar", "artifact1.tar.gz"),
createArtifactPath: true,
wantDeleted: filepath.Join("foo", "bar"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "hostname", time.Minute, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := meta.Artifact{
Path: tt.artifactPath,
}
if tt.createArtifactPath {
g.Expect(os.MkdirAll(filepath.Join(dir, tt.artifactPath), 0o750)).ToNot(HaveOccurred())
}
deleted, err := s.RemoveAll(artifact)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(deleted).To(ContainSubstring(tt.wantDeleted), "unexpected deleted path")
})
}
}
func TestStorageCopyFromPath(t *testing.T) {
type File struct {
Name string
Content []byte
}
dir := t.TempDir()
storage, err := New(dir, "hostname", time.Minute, 2)
if err != nil {
t.Fatalf("error while bootstrapping storage: %v", err)
}
createFile := func(file *File) (absPath string, err error) {
dir = t.TempDir()
absPath = filepath.Join(dir, file.Name)
if err = os.MkdirAll(filepath.Dir(absPath), 0o750); err != nil {
return
}
f, err := os.Create(absPath)
if err != nil {
return "", fmt.Errorf("could not create file %q: %w", absPath, err)
}
if n, err := f.Write(file.Content); err != nil {
f.Close()
return "", fmt.Errorf("could not write %d bytes to file %q: %w", n, f.Name(), err)
}
f.Close()
return
}
matchFile := func(t *testing.T, storage *Storage, artifact meta.Artifact, file *File, expectMismatch bool) {
c, err := os.ReadFile(storage.LocalPath(artifact))
if err != nil {
t.Fatalf("failed reading file: %v", err)
}
if (string(c) != string(file.Content)) != expectMismatch {
t.Errorf("artifact content does not match and not expecting mismatch, got: %q, want: %q", string(c), string(file.Content))
}
}
tests := []struct {
name string
file *File
want *File
expectMismatch bool
}{
{
name: "content match",
file: &File{
Name: "manifest.yaml",
Content: []byte(`contents`),
},
want: &File{
Name: "manifest.yaml",
Content: []byte(`contents`),
},
},
{
name: "content not match",
file: &File{
Name: "manifest.yaml",
Content: []byte(`contents`),
},
want: &File{
Name: "manifest.yaml",
Content: []byte(`mismatch contents`),
},
expectMismatch: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
absPath, err := createFile(tt.file)
if err != nil {
t.Error(err)
return
}
artifact := meta.Artifact{
Path: filepath.Join(randStringRunes(10), randStringRunes(10), randStringRunes(10)),
}
if err := storage.MkdirAll(artifact); err != nil {
t.Fatalf("artifact directory creation failed: %v", err)
}
if err := storage.CopyFromPath(&artifact, absPath); err != nil {
t.Errorf("CopyFromPath() error = %v", err)
}
matchFile(t, storage, artifact, tt.want, tt.expectMismatch)
})
}
}
func TestStorage_getGarbageFiles(t *testing.T) {
artifactFolder := filepath.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
createPause time.Duration
ttl time.Duration
maxItemsToBeRetained int
totalCountLimit int
wantDeleted []string
}{
{
name: "delete files based on maxItemsToBeRetained",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Millisecond * 10,
ttl: time.Minute * 2,
totalCountLimit: 10,
maxItemsToBeRetained: 2,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
},
},
{
name: "delete files based on maxItemsToBeRetained, ignore lock files",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact1.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Millisecond * 10,
ttl: time.Minute * 2,
totalCountLimit: 10,
maxItemsToBeRetained: 2,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
},
},
{
name: "delete files based on ttl",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*3 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl, ignore lock files",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact1.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*3 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
filepath.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Second * 1,
ttl: time.Second*5 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
filepath.Join(artifactFolder, "artifact5.tar.gz"),
filepath.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Millisecond * 500,
ttl: time.Millisecond * 500,
totalCountLimit: 3,
maxItemsToBeRetained: 2,
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "hostname", tt.ttl, tt.maxItemsToBeRetained)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := meta.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(filepath.Join(dir, artifactFolder), 0o750)).ToNot(HaveOccurred())
for _, artifactPath := range tt.artifactPaths {
f, err := os.Create(filepath.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
time.Sleep(tt.createPause)
}
deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl)
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
present := false
for _, deletedPath := range deletedPaths {
if strings.Contains(deletedPath, wantDeletedPath) {
present = true
break
}
}
if !present {
g.Fail(fmt.Sprintf("expected file to be deleted, still exists: %s", wantDeletedPath))
}
}
})
}
}
func TestStorage_GarbageCollect(t *testing.T) {
artifactFolder := filepath.Join("foo", "bar")
tests := []struct {
name string
artifactPaths []string
wantCollected []string
wantDeleted []string
wantErr string
ctxTimeout time.Duration
}{
{
name: "garbage collects",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact1.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
},
wantCollected: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
},
wantDeleted: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact1.tar.gz.lock"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz.lock"),
},
ctxTimeout: time.Second * 1,
},
{
name: "garbage collection fails with context timeout",
artifactPaths: []string{
filepath.Join(artifactFolder, "artifact1.tar.gz"),
filepath.Join(artifactFolder, "artifact2.tar.gz"),
filepath.Join(artifactFolder, "artifact3.tar.gz"),
filepath.Join(artifactFolder, "artifact4.tar.gz"),
},
wantErr: "context deadline exceeded",
ctxTimeout: time.Nanosecond * 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "hostname", time.Second*2, 2)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
artifact := meta.Artifact{
Path: tt.artifactPaths[len(tt.artifactPaths)-1],
}
g.Expect(os.MkdirAll(filepath.Join(dir, artifactFolder), 0o750)).ToNot(HaveOccurred())
for i, artifactPath := range tt.artifactPaths {
f, err := os.Create(filepath.Join(dir, artifactPath))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(f.Close()).ToNot(HaveOccurred())
if i != len(tt.artifactPaths)-1 {
time.Sleep(time.Second * 1)
}
}
collectedPaths, err := s.GarbageCollect(context.TODO(), artifact, tt.ctxTimeout)
if tt.wantErr == "" {
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
} else {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tt.wantErr))
}
if len(tt.wantCollected) > 0 {
g.Expect(len(tt.wantCollected)).To(Equal(len(collectedPaths)))
for _, wantCollectedPath := range tt.wantCollected {
present := false
for _, collectedPath := range collectedPaths {
if strings.Contains(collectedPath, wantCollectedPath) {
g.Expect(collectedPath).ToNot(BeAnExistingFile())
present = true
break
}
}
if present == false {
g.Fail(fmt.Sprintf("expected file to be garbage collected, still exists: %s", wantCollectedPath))
}
}
}
for _, delFile := range tt.wantDeleted {
g.Expect(filepath.Join(dir, delFile)).ToNot(BeAnExistingFile())
}
})
}
}
func TestStorage_VerifyArtifact(t *testing.T) {
g := NewWithT(t)
dir := t.TempDir()
s, err := New(dir, "", 0, 0)
g.Expect(err).ToNot(HaveOccurred(), "failed to create new storage")
g.Expect(os.WriteFile(filepath.Join(dir, "artifact"), []byte("test"), 0o600)).To(Succeed())
t.Run("artifact without digest", func(t *testing.T) {
g := NewWithT(t)
err := s.VerifyArtifact(meta.Artifact{})
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("artifact has no digest"))
})
t.Run("artifact with invalid digest", func(t *testing.T) {
g := NewWithT(t)
err := s.VerifyArtifact(meta.Artifact{Digest: "invalid"})
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("failed to parse artifact digest 'invalid': invalid checksum digest format"))
})
t.Run("artifact with invalid path", func(t *testing.T) {
g := NewWithT(t)
err := s.VerifyArtifact(meta.Artifact{
Digest: "sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69",
Path: "invalid",
})
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, os.ErrNotExist)).To(BeTrue())
})
t.Run("artifact with digest mismatch", func(t *testing.T) {
g := NewWithT(t)
err := s.VerifyArtifact(meta.Artifact{
Digest: "sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69",
Path: "artifact",
})
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("computed digest doesn't match 'sha256:9ba7a35ce8acd3557fe30680ef193ca7a36bb5dc62788f30de7122a0a5beab69'"))
})
t.Run("artifact with digest match", func(t *testing.T) {
g := NewWithT(t)
err := s.VerifyArtifact(meta.Artifact{
Digest: "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08",
Path: "artifact",
})
g.Expect(err).ToNot(HaveOccurred())
})
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz1234567890")
func randStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

148
main.go
View File

@ -18,8 +18,6 @@ package main
import (
"fmt"
"net"
"net/http"
"os"
"time"
@ -39,6 +37,10 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
artcfg "github.com/fluxcd/pkg/artifact/config"
artdigest "github.com/fluxcd/pkg/artifact/digest"
artsrv "github.com/fluxcd/pkg/artifact/server"
artstore "github.com/fluxcd/pkg/artifact/storage"
"github.com/fluxcd/pkg/auth"
pkgcache "github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/git"
@ -54,13 +56,11 @@ import (
"github.com/fluxcd/pkg/runtime/probes"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
intstorage "github.com/fluxcd/source-controller/internal/storage"
// +kubebuilder:scaffold:imports
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/controller"
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm"
"github.com/fluxcd/source-controller/internal/helm/registry"
@ -96,32 +96,27 @@ func main() {
)
var (
metricsAddr string
eventsAddr string
healthAddr string
storagePath string
storageAddr string
storageAdvAddr string
concurrent int
requeueDependency time.Duration
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
rateLimiterOptions helper.RateLimiterOptions
featureGates feathelper.FeatureGates
watchOptions helper.WatchOptions
intervalJitterOptions jitter.IntervalOptions
helmCacheMaxSize int
helmCacheTTL string
helmCachePurgeInterval string
artifactRetentionTTL time.Duration
artifactRetentionRecords int
artifactDigestAlgo string
tokenCacheOptions pkgcache.TokenFlags
defaultServiceAccount string
metricsAddr string
eventsAddr string
healthAddr string
concurrent int
requeueDependency time.Duration
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
artifactOptions artcfg.Options
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
rateLimiterOptions helper.RateLimiterOptions
featureGates feathelper.FeatureGates
watchOptions helper.WatchOptions
intervalJitterOptions jitter.IntervalOptions
helmCacheMaxSize int
helmCacheTTL string
helmCachePurgeInterval string
tokenCacheOptions pkgcache.TokenFlags
defaultServiceAccount string
)
flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"),
@ -129,12 +124,6 @@ func main() {
flag.StringVar(&eventsAddr, "events-addr", envOrDefault("EVENTS_ADDR", ""),
"The address of the events receiver.")
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
flag.StringVar(&storagePath, "storage-path", envOrDefault("STORAGE_PATH", ""),
"The local storage path.")
flag.StringVar(&storageAddr, "storage-addr", envOrDefault("STORAGE_ADDR", ":9090"),
"The address the static file server binds to.")
flag.StringVar(&storageAdvAddr, "storage-adv-addr", envOrDefault("STORAGE_ADV_ADDR", ""),
"The advertised address of the static file server.")
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
flag.Int64Var(&helmIndexLimit, "helm-index-max-size", helm.MaxIndexSize,
"The max allowed size in bytes of a Helm repository index file.")
@ -154,15 +143,10 @@ func main() {
"The list of key exchange algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.StringSliceVar(&git.HostKeyAlgos, "ssh-hostkey-algos", []string{},
"The list of hostkey algorithms to use for ssh connections, arranged from most preferred to the least.")
flag.DurationVar(&artifactRetentionTTL, "artifact-retention-ttl", 60*time.Second,
"The duration of time that artifacts from previous reconciliations will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")
flag.StringVar(&artifactDigestAlgo, "artifact-digest-algo", intdigest.Canonical.String(),
"The algorithm to use to calculate the digest of artifacts.")
flag.StringVar(&defaultServiceAccount, auth.ControllerFlagDefaultServiceAccount,
"", "Default service account to use for workload identity when not specified in resources.")
artifactOptions.BindFlags(flag.CommandLine)
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
leaderElectionOptions.BindFlags(flag.CommandLine)
@ -210,7 +194,19 @@ func main() {
metrics := helper.NewMetrics(mgr, metrics.MustMakeRecorder(), sourcev1.SourceFinalizer)
cacheRecorder := cache.MustMakeMetrics()
eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName)
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactDigestAlgo)
algo, err := artdigest.AlgorithmForName(artifactOptions.ArtifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
artdigest.Canonical = algo
storage, err := artstore.New(&artifactOptions)
if err != nil {
setupLog.Error(err, "unable to configure artifact storage")
os.Exit(1)
}
mustSetupHelmLimits(helmIndexLimit, helmChartLimit, helmChartFileLimit)
helmIndexCache, helmIndexCacheItemTTL := mustInitHelmCache(helmCacheMaxSize, helmCacheTTL, helmCachePurgeInterval)
@ -315,7 +311,11 @@ func main() {
// to handle that.
<-mgr.Elected()
startFileServer(storage.BasePath, storageAddr)
// Start the artifact server if running as leader.
if err := artsrv.Start(ctx, &artifactOptions); err != nil {
setupLog.Error(err, "artifact server error")
os.Exit(1)
}
}()
setupLog.Info("starting manager")
@ -325,17 +325,6 @@ func main() {
}
}
func startFileServer(path string, address string) {
setupLog.Info("starting file server")
fs := http.FileServer(http.Dir(path))
mux := http.NewServeMux()
mux.Handle("/", fs)
err := http.ListenAndServe(address, mux)
if err != nil {
setupLog.Error(err, "file server error")
}
}
func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder {
eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName)
if err != nil {
@ -450,55 +439,6 @@ func mustInitHelmCache(maxSize int, itemTTL, purgeInterval string) (*cache.Cache
return cache.New(maxSize, interval), ttl
}
func mustInitStorage(path string,
storageAdvAddr string,
artifactRetentionTTL time.Duration,
artifactRetentionRecords int,
artifactDigestAlgo string) *intstorage.Storage {
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAdvAddr)
}
if artifactDigestAlgo != intdigest.Canonical.String() {
algo, err := intdigest.AlgorithmForName(artifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
intdigest.Canonical = algo
}
storage, err := intstorage.New(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
if err != nil {
setupLog.Error(err, "unable to initialise storage")
os.Exit(1)
}
return storage
}
func determineAdvStorageAddr(storageAddr string) string {
host, port, err := net.SplitHostPort(storageAddr)
if err != nil {
setupLog.Error(err, "unable to parse storage address")
os.Exit(1)
}
switch host {
case "":
host = "localhost"
case "0.0.0.0":
host = os.Getenv("HOSTNAME")
if host == "" {
hn, err := os.Hostname()
if err != nil {
setupLog.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid")
os.Exit(1)
}
host = hn
}
}
return net.JoinHostPort(host, port)
}
func envOrDefault(envName, defaultValue string) string {
ret := os.Getenv(envName)
if ret != "" {