controllers: use digest for Bucket revision

Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
Hidde Beydals 2022-11-11 14:20:43 +00:00
parent f4eae19045
commit 83b6fdcdd8
5 changed files with 720 additions and 303 deletions

View File

@ -18,17 +18,14 @@ package controllers
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/fluxcd/source-controller/pkg/azure"
"github.com/opencontainers/go-digest"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
@ -51,10 +48,14 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/sourceignore"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
intdigest "github.com/fluxcd/source-controller/internal/digest"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/index"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/pkg/azure"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/minio"
)
@ -154,83 +155,7 @@ type BucketProvider interface {
// bucketReconcileFunc is the function type for all the v1beta2.Bucket
// (sub)reconcile functions. The type implementations are grouped and
// executed serially to perform the complete reconcile of the object.
type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)
// etagIndex is an index of storage object keys and their Etag values.
type etagIndex struct {
sync.RWMutex
index map[string]string
}
// newEtagIndex returns a new etagIndex with an empty initialized index.
func newEtagIndex() *etagIndex {
return &etagIndex{
index: make(map[string]string),
}
}
func (i *etagIndex) Add(key, etag string) {
i.Lock()
defer i.Unlock()
i.index[key] = etag
}
func (i *etagIndex) Delete(key string) {
i.Lock()
defer i.Unlock()
delete(i.index, key)
}
func (i *etagIndex) Get(key string) string {
i.RLock()
defer i.RUnlock()
return i.index[key]
}
func (i *etagIndex) Has(key string) bool {
i.RLock()
defer i.RUnlock()
_, ok := i.index[key]
return ok
}
func (i *etagIndex) Index() map[string]string {
i.RLock()
defer i.RUnlock()
index := make(map[string]string)
for k, v := range i.index {
index[k] = v
}
return index
}
func (i *etagIndex) Len() int {
i.RLock()
defer i.RUnlock()
return len(i.index)
}
// Revision calculates the SHA256 checksum of the index.
// The keys are stable sorted, and the SHA256 sum is then calculated for the
// string representation of the key/value pairs, each pair written on a newline
// with a space between them. The sum result is returned as a string.
func (i *etagIndex) Revision() (string, error) {
i.RLock()
defer i.RUnlock()
keyIndex := make([]string, 0, len(i.index))
for k := range i.index {
keyIndex = append(keyIndex, k)
}
sort.Strings(keyIndex)
sum := sha256.New()
for _, k := range keyIndex {
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil {
return "", err
}
}
return fmt.Sprintf("%x", sum.Sum(nil)), nil
}
type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error)
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
@ -371,7 +296,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche
var (
res sreconcile.Result
resErr error
index = newEtagIndex()
index = index.NewDigester()
)
for _, rec := range reconcilers {
@ -397,7 +322,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche
}
// notify emits notification related to the reconciliation.
func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *index.Digester, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
@ -443,7 +368,7 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.
// condition is added.
// The hostname of any URL in the Status of the object are updated, to ensure
// they match the Storage server hostname of current runtime.
func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *index.Digester, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
@ -484,7 +409,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.Seria
// When a SecretRef is defined, it attempts to fetch the Secret before calling
// the provider. If this fails, it records v1beta2.FetchFailedCondition=True on
// the object and returns early.
func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) {
secret, err := r.getBucketSecret(ctx, obj)
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
@ -538,26 +463,21 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
return sreconcile.ResultEmpty, e
}
// Calculate revision
revision, err := index.Revision()
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to calculate revision: %w", err),
Reason: meta.FailedReason,
}
// Check if index has changed compared to current Artifact revision.
var changed bool
if artifact := obj.Status.Artifact; artifact != nil && artifact.Revision != "" {
curRev := backwardsCompatibleDigest(artifact.Revision)
changed = curRev != index.Digest(curRev.Algorithm())
}
// Mark observations about the revision on the object
defer func() {
// As fetchIndexFiles can make last-minute modifications to the etag
// index, we need to re-calculate the revision at the end
revision, err := index.Revision()
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index")
return
}
// Fetch the bucket objects if required to.
if artifact := obj.GetArtifact(); artifact == nil || changed {
// Mark observations about the revision on the object
defer func() {
// As fetchIndexFiles can make last-minute modifications to the etag
// index, we need to re-calculate the revision at the end
revision := index.Digest(intdigest.Canonical)
if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new upstream revision '%s'", revision)
if obj.GetArtifact() != nil {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
@ -567,10 +487,8 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
ctrl.LoggerFrom(ctx).Error(err, "failed to patch")
return
}
}
}()
}()
if !obj.GetArtifact().HasRevision(revision) {
if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
@ -591,32 +509,32 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
// early.
// On a successful archive, the Artifact in the Status of the object is set,
// and the symlink in the Storage is updated to its path.
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) {
// Calculate revision
revision, err := index.Revision()
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err),
Reason: meta.FailedReason,
}
}
revision := index.Digest(intdigest.Canonical)
// Create artifact
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision.String(), fmt.Sprintf("%s.tar.gz", revision.Encoded()))
// Set the ArtifactInStorageCondition if there's no drift.
defer func() {
if obj.GetArtifact().HasRevision(artifact.Revision) {
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason,
"stored artifact: revision '%s'", artifact.Revision)
if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" {
curRev := backwardsCompatibleDigest(curArtifact.Revision)
if index.Digest(curRev.Algorithm()) == curRev {
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason,
"stored artifact: revision '%s'", artifact.Revision)
}
}
}()
// The artifact is up-to-date
if obj.GetArtifact().HasRevision(artifact.Revision) {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision)
return sreconcile.ResultSuccess, nil
if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" {
curRev := backwardsCompatibleDigest(curArtifact.Revision)
if index.Digest(curRev.Algorithm()) == curRev {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision)
return sreconcile.ResultSuccess, nil
}
}
// Ensure target path exists and is a directory
@ -781,7 +699,7 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context,
// bucket using the given provider, while filtering them using .sourceignore
// rules. After fetching an object, the etag value in the index is updated to
// the current value to ensure accuracy.
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
@ -835,7 +753,7 @@ func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.
// using the given provider, and stores them into tempDir. It downloads in
// parallel, but limited to the maxConcurrentBucketFetches.
// Given an index is provided, the bucket is assumed to exist.
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
@ -879,3 +797,10 @@ func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1
return nil
}
func backwardsCompatibleDigest(d string) digest.Digest {
if !strings.Contains(d, ":") {
d = digest.SHA256.String() + ":" + d
}
return digest.Digest(d)
}

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/index"
)
type mockBucketObject struct {
@ -88,8 +89,8 @@ func (m *mockBucketClient) addObject(key string, object mockBucketObject) {
m.objects[key] = object
}
func (m *mockBucketClient) objectsToEtagIndex() *etagIndex {
i := newEtagIndex()
func (m *mockBucketClient) objectsToDigestIndex() *index.Digester {
i := index.NewDigester()
for k, v := range m.objects {
i.Add(k, v.etag)
}
@ -114,7 +115,7 @@ func Test_fetchEtagIndex(t *testing.T) {
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
index := newEtagIndex()
index := index.NewDigester()
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
@ -128,7 +129,7 @@ func Test_fetchEtagIndex(t *testing.T) {
client := mockBucketClient{bucketName: "other-bucket-name"}
index := newEtagIndex()
index := index.NewDigester()
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
assert.ErrorContains(t, err, "not found")
})
@ -141,7 +142,7 @@ func Test_fetchEtagIndex(t *testing.T) {
client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"})
client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"})
index := newEtagIndex()
index := index.NewDigester()
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
@ -168,7 +169,7 @@ func Test_fetchEtagIndex(t *testing.T) {
bucket := bucket.DeepCopy()
bucket.Spec.Ignore = &ignore
index := newEtagIndex()
index := index.NewDigester()
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
@ -203,7 +204,7 @@ func Test_fetchFiles(t *testing.T) {
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
index := client.objectsToEtagIndex()
index := client.objectsToDigestIndex()
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
@ -225,7 +226,7 @@ func Test_fetchFiles(t *testing.T) {
client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}}
client.objects["error"] = mockBucketObject{}
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp)
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToDigestIndex(), tmp)
if err == nil {
t.Fatal("expected error but got nil")
}
@ -237,7 +238,7 @@ func Test_fetchFiles(t *testing.T) {
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"})
index := newEtagIndex()
index := index.NewDigester()
index.Add("foo.yaml", "etag1")
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
@ -253,7 +254,7 @@ func Test_fetchFiles(t *testing.T) {
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
index := newEtagIndex()
index := index.NewDigester()
index.Add("foo.yaml", "etag1")
// Does not exist on server
index.Add("bar.yaml", "etag2")
@ -276,7 +277,7 @@ func Test_fetchFiles(t *testing.T) {
f := fmt.Sprintf("file-%d", i)
client.addObject(f, mockBucketObject{etag: f, data: f})
}
index := client.objectsToEtagIndex()
index := client.objectsToDigestIndex()
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {

View File

@ -43,6 +43,8 @@ import (
"github.com/fluxcd/pkg/runtime/patch"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/index"
gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs"
s3mock "github.com/fluxcd/source-controller/internal/mock/s3"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
@ -297,7 +299,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred())
}()
index := newEtagIndex()
index := index.NewDigester()
sp := patch.NewSerialPatcher(obj, r.Client)
got, err := r.reconcileStorage(context.TODO(), sp, obj, index, "")
@ -336,7 +338,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
beforeFunc func(obj *sourcev1.Bucket)
want sreconcile.Result
wantErr bool
assertIndex *etagIndex
assertIndex *index.Digester
assertConditions []metav1.Condition
}{
{
@ -351,14 +353,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
// TODO(hidde): middleware for mock server
@ -377,7 +377,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -400,7 +400,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields 'accesskey' and 'secretkey'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -416,7 +416,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -432,7 +432,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -463,14 +463,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
},
},
{
@ -501,15 +499,13 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"ignored/file.txt": "f08907038338288420ae7dc2d30c0497",
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"ignored/file.txt": "f08907038338288420ae7dc2d30c0497",
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
},
},
{
@ -531,11 +527,9 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
@ -556,14 +550,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
{
@ -584,15 +576,13 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
}
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
}
@ -650,7 +640,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred())
}()
index := newEtagIndex()
index := index.NewDigester()
sp := patch.NewSerialPatcher(obj, r.Client)
got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir)
@ -676,7 +666,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
beforeFunc func(obj *sourcev1.Bucket)
want sreconcile.Result
wantErr bool
assertIndex *etagIndex
assertIndex *index.Digester
assertConditions []metav1.Condition
}{
{
@ -706,14 +696,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
}
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
{
@ -728,7 +716,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -752,7 +740,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -769,7 +757,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -786,7 +774,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: newEtagIndex(),
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
@ -817,14 +805,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"),
},
},
{
@ -855,15 +841,13 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"ignored/file.txt": "f08907038338288420ae7dc2d30c0497",
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"ignored/file.txt": "f08907038338288420ae7dc2d30c0497",
"included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"),
},
},
{
@ -885,11 +869,9 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
@ -910,14 +892,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
},
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
{
@ -938,15 +918,13 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
}
},
want: sreconcile.ResultSuccess,
assertIndex: &etagIndex{
index: map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
},
},
assertIndex: index.NewDigester(index.WithIndex(map[string]string{
"test.txt": "098f6bcd4621d373cade4e832627b4f6",
})),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
*conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"),
},
},
// TODO: Middleware for mock server to test authentication using secret.
@ -1009,11 +987,10 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred())
}()
index := newEtagIndex()
index := index.NewDigester()
sp := patch.NewSerialPatcher(obj, r.Client)
got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir)
t.Log(err)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
@ -1030,7 +1007,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
func TestBucketReconciler_reconcileArtifact(t *testing.T) {
tests := []struct {
name string
beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string)
beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string)
afterFunc func(t *WithT, obj *sourcev1.Bucket, dir string)
want sreconcile.Result
wantErr bool
@ -1038,25 +1015,25 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
}{
{
name: "Archiving artifact to storage makes ArtifactInStorage=True",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
want: sreconcile.ResultSuccess,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Up-to-date artifact should not persist and update status",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
revision, _ := index.Revision()
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
revision := index.Digest(intdigest.Canonical)
obj.Spec.Interval = metav1.Duration{Duration: interval}
// Incomplete artifact
obj.Status.Artifact = &sourcev1.Artifact{Revision: revision}
obj.Status.Artifact = &sourcev1.Artifact{Revision: revision.String()}
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
@ -1066,14 +1043,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
},
want: sreconcile.ResultSuccess,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Removes ArtifactOutdatedCondition after creating a new artifact",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "")
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
@ -1081,14 +1058,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
},
want: sreconcile.ResultSuccess,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Creates latest symlink to the created artifact",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
@ -1102,14 +1079,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
},
want: sreconcile.ResultSuccess,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Dir path deleted",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
@ -1124,7 +1101,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
},
{
name: "Dir path is not a directory",
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) {
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) {
// Remove the given directory and create a file for the same
// path.
t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
@ -1174,7 +1151,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
},
}
index := newEtagIndex()
index := index.NewDigester()
if tt.beforeFunc != nil {
tt.beforeFunc(g, obj, index, tmpDir)
@ -1206,57 +1183,6 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
}
}
func Test_etagIndex_Revision(t *testing.T) {
tests := []struct {
name string
list map[string]string
want string
wantErr bool
}{
{
name: "index with items",
list: map[string]string{
"one": "one",
"two": "two",
"three": "three",
},
want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f",
},
{
name: "index with items in different order",
list: map[string]string{
"three": "three",
"one": "one",
"two": "two",
},
want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f",
},
{
name: "empty index",
list: map[string]string{},
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
},
{
name: "nil index",
list: nil,
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
index := &etagIndex{index: tt.list}
got, err := index.Revision()
if (err != nil) != tt.wantErr {
t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("revision() got = %v, want %v", got, tt.want)
}
})
}
}
func TestBucketReconciler_statusConditions(t *testing.T) {
tests := []struct {
name string
@ -1439,12 +1365,10 @@ func TestBucketReconciler_notify(t *testing.T) {
EventRecorder: recorder,
patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"),
}
index := &etagIndex{
index: map[string]string{
"zzz": "qqq",
"bbb": "ddd",
},
}
index := index.NewDigester(index.WithIndex(map[string]string{
"zzz": "qqq",
"bbb": "ddd",
}))
reconciler.notify(ctx, oldObj, newObj, index, tt.res, tt.resErr)
select {

221
internal/index/digest.go Normal file
View File

@ -0,0 +1,221 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package index
import (
"fmt"
"io"
"sort"
"strings"
"sync"
"github.com/opencontainers/go-digest"
)
// Digester is a simple string key value index that can be used to calculate
// digests of the index. The digests are cached, and only recalculated if the
// index has changed.
type Digester struct {
// index is the map of keys and their associated values.
index map[string]string
// digests is a cache of digests calculated for the index.
digests map[digest.Algorithm]digest.Digest
mu sync.RWMutex
}
// DigesterOption is a functional option for configuring a digester.
type DigesterOption func(*Digester)
// WithIndex returns a DigesterOption that sets the index to the provided map.
// The map is copied, so any changes to the map after the option is applied
// will not be reflected in the index.
func WithIndex(i map[string]string) DigesterOption {
return func(d *Digester) {
if i != nil {
d.mu.Lock()
defer d.mu.Unlock()
if d.index == nil {
d.index = make(map[string]string, len(i))
}
for k, v := range i {
d.index[k] = v
}
d.reset()
}
}
}
// NewDigester returns a new digest index with an empty initialized index.
func NewDigester(opts ...DigesterOption) *Digester {
d := &Digester{
digests: make(map[digest.Algorithm]digest.Digest, 0),
index: make(map[string]string, 0),
}
for _, opt := range opts {
opt(d)
}
return d
}
// Add adds the key and digest to the index.
func (i *Digester) Add(key, value string) {
i.mu.Lock()
defer i.mu.Unlock()
i.index[key] = value
i.reset()
}
// Delete removes the key from the index.
func (i *Digester) Delete(key string) {
i.mu.Lock()
defer i.mu.Unlock()
if _, ok := i.index[key]; ok {
delete(i.index, key)
i.reset()
}
}
// Get returns the digest for the key, or an empty digest if the key is not
// found.
func (i *Digester) Get(key string) string {
i.mu.RLock()
defer i.mu.RUnlock()
return i.index[key]
}
// Has returns true if the index contains the key.
func (i *Digester) Has(key string) bool {
i.mu.RLock()
defer i.mu.RUnlock()
_, ok := i.index[key]
return ok
}
// Index returns a copy of the index.
func (i *Digester) Index() map[string]string {
i.mu.RLock()
defer i.mu.RUnlock()
index := make(map[string]string, len(i.index))
for k, v := range i.index {
index[k] = v
}
return index
}
// Len returns the number of keys in the index.
func (i *Digester) Len() int {
i.mu.RLock()
defer i.mu.RUnlock()
return len(i.index)
}
// String returns a string representation of the index. The keys are stable
// sorted, and the string representation of the key/value pairs is written,
// each pair on a newline with a space between them.
func (i *Digester) String() string {
i.mu.RLock()
defer i.mu.RUnlock()
keys := i.sortedKeys()
var b strings.Builder
for _, k := range keys {
b.Grow(len(k) + len(i.index[k]) + 2)
writeLine(&b, k, i.index[k])
}
return b.String()
}
// WriteTo writes the index to the writer. The keys are stable sorted, and the
// string representation of the key/value pairs is written, each pair on a
// newline with a space between them.
func (i *Digester) WriteTo(w io.Writer) (int64, error) {
i.mu.RLock()
defer i.mu.RUnlock()
keys := i.sortedKeys()
var n int64
for _, k := range keys {
nn, err := writeLine(w, k, i.index[k])
n += int64(nn)
if err != nil {
return n, err
}
}
return n, nil
}
// Digest returns the digest of the index using the provided algorithm.
// If the index has not changed since the last call to Digest, the cached
// digest is returned.
// For verifying the index against a known digest, use Verify.
func (i *Digester) Digest(a digest.Algorithm) digest.Digest {
i.mu.Lock()
defer i.mu.Unlock()
if _, ok := i.digests[a]; !ok {
digester := a.Digester()
keys := i.sortedKeys()
for _, k := range keys {
_, _ = writeLine(digester.Hash(), k, i.index[k])
}
i.digests[a] = digester.Digest()
}
return i.digests[a]
}
// Verify returns true if the index matches the provided digest.
func (i *Digester) Verify(d digest.Digest) bool {
i.mu.RLock()
defer i.mu.RUnlock()
verifier := d.Verifier()
keys := i.sortedKeys()
for _, k := range keys {
_, _ = writeLine(verifier, k, i.index[k])
}
return verifier.Verified()
}
// sortedKeys returns a slice of the keys in the index, sorted alphabetically.
func (i *Digester) sortedKeys() []string {
keys := make([]string, 0, len(i.index))
for k := range i.index {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// reset clears the digests cache.
func (i *Digester) reset() {
i.digests = make(map[digest.Algorithm]digest.Digest, 0)
}
// writeLine writes the key and digest to the writer, separated by a space and
// terminating with a newline.
func writeLine(w io.Writer, key, value string) (int, error) {
return fmt.Fprintf(w, "%s %s\n", key, value)
}

View File

@ -0,0 +1,346 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package index
import (
"bytes"
"errors"
"testing"
. "github.com/onsi/gomega"
"github.com/opencontainers/go-digest"
)
func TestWithIndex(t *testing.T) {
t.Run("sets the index", func(t *testing.T) {
g := NewWithT(t)
i := map[string]string{"foo": "bar"}
d := &Digester{}
WithIndex(i)(d)
g.Expect(d.index).To(Equal(i))
})
t.Run("resets the digests", func(t *testing.T) {
g := NewWithT(t)
i := map[string]string{"foo": "bar"}
d := &Digester{
digests: map[digest.Algorithm]digest.Digest{
digest.SHA256: "sha256:foo",
},
}
WithIndex(i)(d)
g.Expect(d.digests).To(BeEmpty())
})
}
func TestNewDigester(t *testing.T) {
t.Run("default", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
g.Expect(d).ToNot(BeNil())
g.Expect(d.index).ToNot(BeNil())
g.Expect(d.digests).ToNot(BeNil())
})
t.Run("with index", func(t *testing.T) {
g := NewWithT(t)
i := map[string]string{"foo": "bar"}
d := NewDigester(WithIndex(i))
g.Expect(d).ToNot(BeNil())
g.Expect(d.index).To(Equal(i))
g.Expect(d.digests).ToNot(BeNil())
})
}
func TestDigester_Add(t *testing.T) {
t.Run("adds", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.Add("foo", "bar")
g.Expect(d.index).To(HaveKeyWithValue("foo", "bar"))
})
t.Run("overwrites", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.Add("foo", "bar")
d.Add("foo", "baz")
g.Expect(d.index).To(HaveKeyWithValue("foo", "baz"))
})
t.Run("resets digests", func(t *testing.T) {
g := NewWithT(t)
d := &Digester{
index: map[string]string{},
digests: map[digest.Algorithm]digest.Digest{
digest.SHA256: "sha256:foo",
},
}
d.Add("foo", "bar")
g.Expect(d.digests).To(BeEmpty())
})
}
func TestDigester_Delete(t *testing.T) {
t.Run("deletes", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.Add("foo", "bar")
d.Delete("foo")
g.Expect(d.index).ToNot(HaveKey("foo"))
})
t.Run("resets digests", func(t *testing.T) {
g := NewWithT(t)
d := &Digester{
index: map[string]string{
"foo": "bar",
},
digests: map[digest.Algorithm]digest.Digest{
digest.SHA256: "sha256:foo",
},
}
d.Delete("nop")
g.Expect(d.digests).To(HaveLen(1))
d.Delete("foo")
g.Expect(d.digests).To(BeEmpty())
})
}
func TestDigester_Get(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.Add("foo", "bar")
g.Expect(d.Get("foo")).To(Equal("bar"))
g.Expect(d.Get("bar")).To(BeEmpty())
}
func TestDigester_Has(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.Add("foo", "bar")
g.Expect(d.Has("foo")).To(BeTrue())
g.Expect(d.Has("bar")).To(BeFalse())
}
func TestDigester_Index(t *testing.T) {
g := NewWithT(t)
i := map[string]string{
"foo": "bar",
"bar": "baz",
}
d := NewDigester(WithIndex(i))
iCopy := d.Index()
g.Expect(iCopy).To(Equal(i))
g.Expect(iCopy).ToNot(BeIdenticalTo(i))
}
func TestDigester_Len(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
"bar": "baz",
}))
g.Expect(d.Len()).To(Equal(2))
}
func TestDigester_String(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
"bar": "baz",
}))
g.Expect(d.String()).To(Equal(`bar baz
foo bar
`))
}
func TestDigester_WriteTo(t *testing.T) {
t.Run("writes", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
"bar": "baz",
}))
expect := `bar baz
foo bar
`
var buf bytes.Buffer
n, err := d.WriteTo(&buf)
g.Expect(n).To(Equal(int64(len(expect))))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(buf.String()).To(Equal(expect))
})
t.Run("errors", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
"bar": "baz",
}))
w := &fakeWriter{
err: errors.New("write error"),
written: 5,
}
n, err := d.WriteTo(w)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, w.err)).To(BeTrue())
g.Expect(n).To(Equal(int64(w.written)))
})
}
func TestDigester_Digest(t *testing.T) {
t.Run("returns digest", func(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
"bar": "baz",
}))
expect := digest.SHA256.FromString(d.String())
g.Expect(d.Digest(digest.SHA256)).To(Equal(expect))
g.Expect(d.digests).To(HaveKeyWithValue(digest.SHA256, expect))
})
t.Run("returns cached digest", func(t *testing.T) {
g := NewWithT(t)
d := &Digester{
index: map[string]string{
"foo": "bar",
"bar": "baz",
},
digests: map[digest.Algorithm]digest.Digest{
digest.SHA256: "sha256:foo",
},
}
g.Expect(d.Digest(digest.SHA256)).To(Equal(d.digests[digest.SHA256]))
})
}
func TestDigester_Verify(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"foo": "bar",
}))
g.Expect(d.Verify(d.Digest(digest.SHA256))).To(BeTrue())
g.Expect(d.Verify(digest.SHA256.FromString("different"))).To(BeFalse())
}
func TestDigester_sortedKeys(t *testing.T) {
g := NewWithT(t)
d := NewDigester(WithIndex(map[string]string{
"c/d/e": "bar",
"a/b/c": "baz",
"f/g/h": "foo",
}))
g.Expect(d.sortedKeys()).To(Equal([]string{
"a/b/c",
"c/d/e",
"f/g/h",
}))
}
func TestDigester_reset(t *testing.T) {
g := NewWithT(t)
d := NewDigester()
d.digests = map[digest.Algorithm]digest.Digest{
digest.SHA256: "sha256:foo",
}
d.reset()
g.Expect(d.digests).To(BeEmpty())
}
func Test_writeLine(t *testing.T) {
t.Run("writes", func(t *testing.T) {
g := NewWithT(t)
var buf bytes.Buffer
n, err := writeLine(&buf, "foo", "bar")
g.Expect(n).To(Equal(8))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(buf.String()).To(Equal(`foo bar
`))
})
t.Run("errors", func(t *testing.T) {
g := NewWithT(t)
w := &fakeWriter{
err: errors.New("write error"),
written: 5,
}
n, err := writeLine(w, "foo", "bar")
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, w.err)).To(BeTrue())
g.Expect(n).To(Equal(w.written))
})
}
type fakeWriter struct {
written int
err error
}
func (f *fakeWriter) Write(p []byte) (n int, err error) {
return f.written, f.err
}