From 83b6fdcdd85231c321e5e1f4307661db6f72492f Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Fri, 11 Nov 2022 14:20:43 +0000 Subject: [PATCH] controllers: use digest for Bucket revision Signed-off-by: Hidde Beydals --- controllers/bucket_controller.go | 171 +++------- controllers/bucket_controller_fetch_test.go | 23 +- controllers/bucket_controller_test.go | 262 ++++++--------- internal/index/digest.go | 221 +++++++++++++ internal/index/digest_test.go | 346 ++++++++++++++++++++ 5 files changed, 720 insertions(+), 303 deletions(-) create mode 100644 internal/index/digest.go create mode 100644 internal/index/digest_test.go diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index a8f2074d..96903e3c 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -18,17 +18,14 @@ package controllers import ( "context" - "crypto/sha256" "errors" "fmt" "os" "path/filepath" - "sort" "strings" - "sync" "time" - "github.com/fluxcd/source-controller/pkg/azure" + "github.com/opencontainers/go-digest" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" @@ -51,10 +48,14 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/sourceignore" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + intdigest "github.com/fluxcd/source-controller/internal/digest" serror "github.com/fluxcd/source-controller/internal/error" + "github.com/fluxcd/source-controller/internal/index" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" + "github.com/fluxcd/source-controller/pkg/azure" "github.com/fluxcd/source-controller/pkg/gcp" "github.com/fluxcd/source-controller/pkg/minio" ) @@ -154,83 +155,7 @@ type BucketProvider interface { // bucketReconcileFunc is the function type for all the v1beta2.Bucket // (sub)reconcile functions. The type implementations are grouped and // executed serially to perform the complete reconcile of the object. -type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) - -// etagIndex is an index of storage object keys and their Etag values. -type etagIndex struct { - sync.RWMutex - index map[string]string -} - -// newEtagIndex returns a new etagIndex with an empty initialized index. -func newEtagIndex() *etagIndex { - return &etagIndex{ - index: make(map[string]string), - } -} - -func (i *etagIndex) Add(key, etag string) { - i.Lock() - defer i.Unlock() - i.index[key] = etag -} - -func (i *etagIndex) Delete(key string) { - i.Lock() - defer i.Unlock() - delete(i.index, key) -} - -func (i *etagIndex) Get(key string) string { - i.RLock() - defer i.RUnlock() - return i.index[key] -} - -func (i *etagIndex) Has(key string) bool { - i.RLock() - defer i.RUnlock() - _, ok := i.index[key] - return ok -} - -func (i *etagIndex) Index() map[string]string { - i.RLock() - defer i.RUnlock() - index := make(map[string]string) - for k, v := range i.index { - index[k] = v - } - return index -} - -func (i *etagIndex) Len() int { - i.RLock() - defer i.RUnlock() - return len(i.index) -} - -// Revision calculates the SHA256 checksum of the index. -// The keys are stable sorted, and the SHA256 sum is then calculated for the -// string representation of the key/value pairs, each pair written on a newline -// with a space between them. The sum result is returned as a string. -func (i *etagIndex) Revision() (string, error) { - i.RLock() - defer i.RUnlock() - keyIndex := make([]string, 0, len(i.index)) - for k := range i.index { - keyIndex = append(keyIndex, k) - } - - sort.Strings(keyIndex) - sum := sha256.New() - for _, k := range keyIndex { - if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil { - return "", err - } - } - return fmt.Sprintf("%x", sum.Sum(nil)), nil -} +type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) @@ -371,7 +296,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche var ( res sreconcile.Result resErr error - index = newEtagIndex() + index = index.NewDigester() ) for _, rec := range reconcilers { @@ -397,7 +322,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche } // notify emits notification related to the reconciliation. -func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) { +func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *index.Digester, res sreconcile.Result, resErr error) { // Notify successful reconciliation for new artifact and recovery from any // failure. if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { @@ -443,7 +368,7 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1. // condition is added. // The hostname of any URL in the Status of the object are updated, to ensure // they match the Storage server hostname of current runtime. -func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *index.Digester, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage _ = r.garbageCollect(ctx, obj) @@ -484,7 +409,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.Seria // When a SecretRef is defined, it attempts to fetch the Secret before calling // the provider. If this fails, it records v1beta2.FetchFailedCondition=True on // the object and returns early. -func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) { secret, err := r.getBucketSecret(ctx, obj) if err != nil { e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} @@ -538,26 +463,21 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial return sreconcile.ResultEmpty, e } - // Calculate revision - revision, err := index.Revision() - if err != nil { - return sreconcile.ResultEmpty, &serror.Event{ - Err: fmt.Errorf("failed to calculate revision: %w", err), - Reason: meta.FailedReason, - } + // Check if index has changed compared to current Artifact revision. + var changed bool + if artifact := obj.Status.Artifact; artifact != nil && artifact.Revision != "" { + curRev := backwardsCompatibleDigest(artifact.Revision) + changed = curRev != index.Digest(curRev.Algorithm()) } - // Mark observations about the revision on the object - defer func() { - // As fetchIndexFiles can make last-minute modifications to the etag - // index, we need to re-calculate the revision at the end - revision, err := index.Revision() - if err != nil { - ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index") - return - } + // Fetch the bucket objects if required to. + if artifact := obj.GetArtifact(); artifact == nil || changed { + // Mark observations about the revision on the object + defer func() { + // As fetchIndexFiles can make last-minute modifications to the etag + // index, we need to re-calculate the revision at the end + revision := index.Digest(intdigest.Canonical) - if !obj.GetArtifact().HasRevision(revision) { message := fmt.Sprintf("new upstream revision '%s'", revision) if obj.GetArtifact() != nil { conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) @@ -567,10 +487,8 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial ctrl.LoggerFrom(ctx).Error(err, "failed to patch") return } - } - }() + }() - if !obj.GetArtifact().HasRevision(revision) { if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil { e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason} conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) @@ -591,32 +509,32 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial // early. // On a successful archive, the Artifact in the Status of the object is set, // and the symlink in the Storage is updated to its path. -func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) { // Calculate revision - revision, err := index.Revision() - if err != nil { - return sreconcile.ResultEmpty, &serror.Event{ - Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err), - Reason: meta.FailedReason, - } - } + revision := index.Digest(intdigest.Canonical) // Create artifact - artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision.String(), fmt.Sprintf("%s.tar.gz", revision.Encoded())) // Set the ArtifactInStorageCondition if there's no drift. defer func() { - if obj.GetArtifact().HasRevision(artifact.Revision) { - conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) - conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason, - "stored artifact: revision '%s'", artifact.Revision) + if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" { + curRev := backwardsCompatibleDigest(curArtifact.Revision) + if index.Digest(curRev.Algorithm()) == curRev { + conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) + conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason, + "stored artifact: revision '%s'", artifact.Revision) + } } }() // The artifact is up-to-date - if obj.GetArtifact().HasRevision(artifact.Revision) { - r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) - return sreconcile.ResultSuccess, nil + if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" { + curRev := backwardsCompatibleDigest(curArtifact.Revision) + if index.Digest(curRev.Algorithm()) == curRev { + r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) + return sreconcile.ResultSuccess, nil + } } // Ensure target path exists and is a directory @@ -781,7 +699,7 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context, // bucket using the given provider, while filtering them using .sourceignore // rules. After fetching an object, the etag value in the index is updated to // the current value to ensure accuracy. -func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { +func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error { ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) defer cancel() @@ -835,7 +753,7 @@ func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1. // using the given provider, and stores them into tempDir. It downloads in // parallel, but limited to the maxConcurrentBucketFetches. // Given an index is provided, the bucket is assumed to exist. -func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { +func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error { ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) defer cancel() @@ -879,3 +797,10 @@ func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1 return nil } + +func backwardsCompatibleDigest(d string) digest.Digest { + if !strings.Contains(d, ":") { + d = digest.SHA256.String() + ":" + d + } + return digest.Digest(d) +} diff --git a/controllers/bucket_controller_fetch_test.go b/controllers/bucket_controller_fetch_test.go index 0dfaa005..ad9b6ffd 100644 --- a/controllers/bucket_controller_fetch_test.go +++ b/controllers/bucket_controller_fetch_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/index" ) type mockBucketObject struct { @@ -88,8 +89,8 @@ func (m *mockBucketClient) addObject(key string, object mockBucketObject) { m.objects[key] = object } -func (m *mockBucketClient) objectsToEtagIndex() *etagIndex { - i := newEtagIndex() +func (m *mockBucketClient) objectsToDigestIndex() *index.Digester { + i := index.NewDigester() for k, v := range m.objects { i.Add(k, v.etag) } @@ -114,7 +115,7 @@ func Test_fetchEtagIndex(t *testing.T) { client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) - index := newEtagIndex() + index := index.NewDigester() err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { t.Fatal(err) @@ -128,7 +129,7 @@ func Test_fetchEtagIndex(t *testing.T) { client := mockBucketClient{bucketName: "other-bucket-name"} - index := newEtagIndex() + index := index.NewDigester() err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) assert.ErrorContains(t, err, "not found") }) @@ -141,7 +142,7 @@ func Test_fetchEtagIndex(t *testing.T) { client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"}) client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"}) - index := newEtagIndex() + index := index.NewDigester() err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { t.Fatal(err) @@ -168,7 +169,7 @@ func Test_fetchEtagIndex(t *testing.T) { bucket := bucket.DeepCopy() bucket.Spec.Ignore = &ignore - index := newEtagIndex() + index := index.NewDigester() err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { t.Fatal(err) @@ -203,7 +204,7 @@ func Test_fetchFiles(t *testing.T) { client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) - index := client.objectsToEtagIndex() + index := client.objectsToDigestIndex() err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { @@ -225,7 +226,7 @@ func Test_fetchFiles(t *testing.T) { client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}} client.objects["error"] = mockBucketObject{} - err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp) + err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToDigestIndex(), tmp) if err == nil { t.Fatal("expected error but got nil") } @@ -237,7 +238,7 @@ func Test_fetchFiles(t *testing.T) { client := mockBucketClient{bucketName: bucketName} client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"}) - index := newEtagIndex() + index := index.NewDigester() index.Add("foo.yaml", "etag1") err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { @@ -253,7 +254,7 @@ func Test_fetchFiles(t *testing.T) { client := mockBucketClient{bucketName: bucketName} client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) - index := newEtagIndex() + index := index.NewDigester() index.Add("foo.yaml", "etag1") // Does not exist on server index.Add("bar.yaml", "etag2") @@ -276,7 +277,7 @@ func Test_fetchFiles(t *testing.T) { f := fmt.Sprintf("file-%d", i) client.addObject(f, mockBucketObject{etag: f, data: f}) } - index := client.objectsToEtagIndex() + index := client.objectsToDigestIndex() err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) if err != nil { diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index b7a342a6..606871f1 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -43,6 +43,8 @@ import ( "github.com/fluxcd/pkg/runtime/patch" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + intdigest "github.com/fluxcd/source-controller/internal/digest" + "github.com/fluxcd/source-controller/internal/index" gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs" s3mock "github.com/fluxcd/source-controller/internal/mock/s3" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" @@ -297,7 +299,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) }() - index := newEtagIndex() + index := index.NewDigester() sp := patch.NewSerialPatcher(obj, r.Client) got, err := r.reconcileStorage(context.TODO(), sp, obj, index, "") @@ -336,7 +338,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertIndex *etagIndex + assertIndex *index.Digester assertConditions []metav1.Condition }{ { @@ -351,14 +353,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO(hidde): middleware for mock server @@ -377,7 +377,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -400,7 +400,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields 'accesskey' and 'secretkey'"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -416,7 +416,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -432,7 +432,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -463,14 +463,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { @@ -501,15 +499,13 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", - "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { @@ -531,11 +527,9 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), @@ -556,14 +550,12 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { @@ -584,15 +576,13 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { } }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, } @@ -650,7 +640,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) }() - index := newEtagIndex() + index := index.NewDigester() sp := patch.NewSerialPatcher(obj, r.Client) got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir) @@ -676,7 +666,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertIndex *etagIndex + assertIndex *index.Digester assertConditions []metav1.Condition }{ { @@ -706,14 +696,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { } }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { @@ -728,7 +716,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, want: sreconcile.ResultEmpty, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -752,7 +740,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, want: sreconcile.ResultEmpty, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -769,7 +757,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, want: sreconcile.ResultEmpty, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -786,7 +774,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, want: sreconcile.ResultEmpty, wantErr: true, - assertIndex: newEtagIndex(), + assertIndex: index.NewDigester(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), @@ -817,14 +805,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { @@ -855,15 +841,13 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", - "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { @@ -885,11 +869,9 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), @@ -910,14 +892,12 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { @@ -938,15 +918,13 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { } }, want: sreconcile.ResultSuccess, - assertIndex: &etagIndex{ - index: map[string]string{ - "test.txt": "098f6bcd4621d373cade4e832627b4f6", - }, - }, + assertIndex: index.NewDigester(index.WithIndex(map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + })), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'sha256:b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO: Middleware for mock server to test authentication using secret. @@ -1009,11 +987,10 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) }() - index := newEtagIndex() + index := index.NewDigester() sp := patch.NewSerialPatcher(obj, r.Client) got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir) - t.Log(err) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) @@ -1030,7 +1007,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { func TestBucketReconciler_reconcileArtifact(t *testing.T) { tests := []struct { name string - beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) + beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) afterFunc func(t *WithT, obj *sourcev1.Bucket, dir string) want sreconcile.Result wantErr bool @@ -1038,25 +1015,25 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }{ { name: "Archiving artifact to storage makes ArtifactInStorage=True", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Up-to-date artifact should not persist and update status", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { - revision, _ := index.Revision() + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { + revision := index.Digest(intdigest.Canonical) obj.Spec.Interval = metav1.Duration{Duration: interval} // Incomplete artifact - obj.Status.Artifact = &sourcev1.Artifact{Revision: revision} + obj.Status.Artifact = &sourcev1.Artifact{Revision: revision.String()} conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, @@ -1066,14 +1043,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Removes ArtifactOutdatedCondition after creating a new artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "") conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") @@ -1081,14 +1058,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Creates latest symlink to the created artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") @@ -1102,14 +1079,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Dir path deleted", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") @@ -1124,7 +1101,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, { name: "Dir path is not a directory", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *index.Digester, dir string) { // Remove the given directory and create a file for the same // path. t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) @@ -1174,7 +1151,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, } - index := newEtagIndex() + index := index.NewDigester() if tt.beforeFunc != nil { tt.beforeFunc(g, obj, index, tmpDir) @@ -1206,57 +1183,6 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { } } -func Test_etagIndex_Revision(t *testing.T) { - tests := []struct { - name string - list map[string]string - want string - wantErr bool - }{ - { - name: "index with items", - list: map[string]string{ - "one": "one", - "two": "two", - "three": "three", - }, - want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", - }, - { - name: "index with items in different order", - list: map[string]string{ - "three": "three", - "one": "one", - "two": "two", - }, - want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", - }, - { - name: "empty index", - list: map[string]string{}, - want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - }, - { - name: "nil index", - list: nil, - want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - index := &etagIndex{index: tt.list} - got, err := index.Revision() - if (err != nil) != tt.wantErr { - t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("revision() got = %v, want %v", got, tt.want) - } - }) - } -} - func TestBucketReconciler_statusConditions(t *testing.T) { tests := []struct { name string @@ -1439,12 +1365,10 @@ func TestBucketReconciler_notify(t *testing.T) { EventRecorder: recorder, patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } - index := &etagIndex{ - index: map[string]string{ - "zzz": "qqq", - "bbb": "ddd", - }, - } + index := index.NewDigester(index.WithIndex(map[string]string{ + "zzz": "qqq", + "bbb": "ddd", + })) reconciler.notify(ctx, oldObj, newObj, index, tt.res, tt.resErr) select { diff --git a/internal/index/digest.go b/internal/index/digest.go new file mode 100644 index 00000000..1f7bd642 --- /dev/null +++ b/internal/index/digest.go @@ -0,0 +1,221 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package index + +import ( + "fmt" + "io" + "sort" + "strings" + "sync" + + "github.com/opencontainers/go-digest" +) + +// Digester is a simple string key value index that can be used to calculate +// digests of the index. The digests are cached, and only recalculated if the +// index has changed. +type Digester struct { + // index is the map of keys and their associated values. + index map[string]string + + // digests is a cache of digests calculated for the index. + digests map[digest.Algorithm]digest.Digest + + mu sync.RWMutex +} + +// DigesterOption is a functional option for configuring a digester. +type DigesterOption func(*Digester) + +// WithIndex returns a DigesterOption that sets the index to the provided map. +// The map is copied, so any changes to the map after the option is applied +// will not be reflected in the index. +func WithIndex(i map[string]string) DigesterOption { + return func(d *Digester) { + if i != nil { + d.mu.Lock() + defer d.mu.Unlock() + + if d.index == nil { + d.index = make(map[string]string, len(i)) + } + for k, v := range i { + d.index[k] = v + } + d.reset() + } + } +} + +// NewDigester returns a new digest index with an empty initialized index. +func NewDigester(opts ...DigesterOption) *Digester { + d := &Digester{ + digests: make(map[digest.Algorithm]digest.Digest, 0), + index: make(map[string]string, 0), + } + for _, opt := range opts { + opt(d) + } + return d +} + +// Add adds the key and digest to the index. +func (i *Digester) Add(key, value string) { + i.mu.Lock() + defer i.mu.Unlock() + + i.index[key] = value + i.reset() +} + +// Delete removes the key from the index. +func (i *Digester) Delete(key string) { + i.mu.Lock() + defer i.mu.Unlock() + + if _, ok := i.index[key]; ok { + delete(i.index, key) + i.reset() + } +} + +// Get returns the digest for the key, or an empty digest if the key is not +// found. +func (i *Digester) Get(key string) string { + i.mu.RLock() + defer i.mu.RUnlock() + + return i.index[key] +} + +// Has returns true if the index contains the key. +func (i *Digester) Has(key string) bool { + i.mu.RLock() + defer i.mu.RUnlock() + + _, ok := i.index[key] + return ok +} + +// Index returns a copy of the index. +func (i *Digester) Index() map[string]string { + i.mu.RLock() + defer i.mu.RUnlock() + + index := make(map[string]string, len(i.index)) + for k, v := range i.index { + index[k] = v + } + return index +} + +// Len returns the number of keys in the index. +func (i *Digester) Len() int { + i.mu.RLock() + defer i.mu.RUnlock() + return len(i.index) +} + +// String returns a string representation of the index. The keys are stable +// sorted, and the string representation of the key/value pairs is written, +// each pair on a newline with a space between them. +func (i *Digester) String() string { + i.mu.RLock() + defer i.mu.RUnlock() + + keys := i.sortedKeys() + var b strings.Builder + for _, k := range keys { + b.Grow(len(k) + len(i.index[k]) + 2) + writeLine(&b, k, i.index[k]) + } + return b.String() +} + +// WriteTo writes the index to the writer. The keys are stable sorted, and the +// string representation of the key/value pairs is written, each pair on a +// newline with a space between them. +func (i *Digester) WriteTo(w io.Writer) (int64, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + keys := i.sortedKeys() + var n int64 + for _, k := range keys { + nn, err := writeLine(w, k, i.index[k]) + n += int64(nn) + if err != nil { + return n, err + } + } + return n, nil +} + +// Digest returns the digest of the index using the provided algorithm. +// If the index has not changed since the last call to Digest, the cached +// digest is returned. +// For verifying the index against a known digest, use Verify. +func (i *Digester) Digest(a digest.Algorithm) digest.Digest { + i.mu.Lock() + defer i.mu.Unlock() + + if _, ok := i.digests[a]; !ok { + digester := a.Digester() + keys := i.sortedKeys() + for _, k := range keys { + _, _ = writeLine(digester.Hash(), k, i.index[k]) + } + i.digests[a] = digester.Digest() + } + + return i.digests[a] +} + +// Verify returns true if the index matches the provided digest. +func (i *Digester) Verify(d digest.Digest) bool { + i.mu.RLock() + defer i.mu.RUnlock() + + verifier := d.Verifier() + keys := i.sortedKeys() + for _, k := range keys { + _, _ = writeLine(verifier, k, i.index[k]) + } + return verifier.Verified() +} + +// sortedKeys returns a slice of the keys in the index, sorted alphabetically. +func (i *Digester) sortedKeys() []string { + keys := make([]string, 0, len(i.index)) + for k := range i.index { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// reset clears the digests cache. +func (i *Digester) reset() { + i.digests = make(map[digest.Algorithm]digest.Digest, 0) +} + +// writeLine writes the key and digest to the writer, separated by a space and +// terminating with a newline. +func writeLine(w io.Writer, key, value string) (int, error) { + return fmt.Fprintf(w, "%s %s\n", key, value) +} diff --git a/internal/index/digest_test.go b/internal/index/digest_test.go new file mode 100644 index 00000000..8afc4fd0 --- /dev/null +++ b/internal/index/digest_test.go @@ -0,0 +1,346 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package index + +import ( + "bytes" + "errors" + "testing" + + . "github.com/onsi/gomega" + "github.com/opencontainers/go-digest" +) + +func TestWithIndex(t *testing.T) { + t.Run("sets the index", func(t *testing.T) { + g := NewWithT(t) + + i := map[string]string{"foo": "bar"} + d := &Digester{} + WithIndex(i)(d) + + g.Expect(d.index).To(Equal(i)) + }) + + t.Run("resets the digests", func(t *testing.T) { + g := NewWithT(t) + + i := map[string]string{"foo": "bar"} + d := &Digester{ + digests: map[digest.Algorithm]digest.Digest{ + digest.SHA256: "sha256:foo", + }, + } + WithIndex(i)(d) + + g.Expect(d.digests).To(BeEmpty()) + }) +} + +func TestNewDigester(t *testing.T) { + t.Run("default", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + + g.Expect(d).ToNot(BeNil()) + g.Expect(d.index).ToNot(BeNil()) + g.Expect(d.digests).ToNot(BeNil()) + }) + + t.Run("with index", func(t *testing.T) { + g := NewWithT(t) + + i := map[string]string{"foo": "bar"} + d := NewDigester(WithIndex(i)) + + g.Expect(d).ToNot(BeNil()) + g.Expect(d.index).To(Equal(i)) + g.Expect(d.digests).ToNot(BeNil()) + }) +} + +func TestDigester_Add(t *testing.T) { + t.Run("adds", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.Add("foo", "bar") + + g.Expect(d.index).To(HaveKeyWithValue("foo", "bar")) + }) + + t.Run("overwrites", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.Add("foo", "bar") + d.Add("foo", "baz") + + g.Expect(d.index).To(HaveKeyWithValue("foo", "baz")) + }) + + t.Run("resets digests", func(t *testing.T) { + g := NewWithT(t) + + d := &Digester{ + index: map[string]string{}, + digests: map[digest.Algorithm]digest.Digest{ + digest.SHA256: "sha256:foo", + }, + } + d.Add("foo", "bar") + + g.Expect(d.digests).To(BeEmpty()) + }) +} + +func TestDigester_Delete(t *testing.T) { + t.Run("deletes", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.Add("foo", "bar") + d.Delete("foo") + + g.Expect(d.index).ToNot(HaveKey("foo")) + }) + + t.Run("resets digests", func(t *testing.T) { + g := NewWithT(t) + + d := &Digester{ + index: map[string]string{ + "foo": "bar", + }, + digests: map[digest.Algorithm]digest.Digest{ + digest.SHA256: "sha256:foo", + }, + } + + d.Delete("nop") + g.Expect(d.digests).To(HaveLen(1)) + + d.Delete("foo") + g.Expect(d.digests).To(BeEmpty()) + }) +} + +func TestDigester_Get(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.Add("foo", "bar") + + g.Expect(d.Get("foo")).To(Equal("bar")) + g.Expect(d.Get("bar")).To(BeEmpty()) +} + +func TestDigester_Has(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.Add("foo", "bar") + + g.Expect(d.Has("foo")).To(BeTrue()) + g.Expect(d.Has("bar")).To(BeFalse()) +} + +func TestDigester_Index(t *testing.T) { + g := NewWithT(t) + + i := map[string]string{ + "foo": "bar", + "bar": "baz", + } + d := NewDigester(WithIndex(i)) + + iCopy := d.Index() + g.Expect(iCopy).To(Equal(i)) + g.Expect(iCopy).ToNot(BeIdenticalTo(i)) +} + +func TestDigester_Len(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + "bar": "baz", + })) + + g.Expect(d.Len()).To(Equal(2)) +} + +func TestDigester_String(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + "bar": "baz", + })) + + g.Expect(d.String()).To(Equal(`bar baz +foo bar +`)) +} + +func TestDigester_WriteTo(t *testing.T) { + t.Run("writes", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + "bar": "baz", + })) + expect := `bar baz +foo bar +` + + var buf bytes.Buffer + n, err := d.WriteTo(&buf) + + g.Expect(n).To(Equal(int64(len(expect)))) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(buf.String()).To(Equal(expect)) + }) + + t.Run("errors", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + "bar": "baz", + })) + + w := &fakeWriter{ + err: errors.New("write error"), + written: 5, + } + n, err := d.WriteTo(w) + + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.Is(err, w.err)).To(BeTrue()) + g.Expect(n).To(Equal(int64(w.written))) + }) +} + +func TestDigester_Digest(t *testing.T) { + t.Run("returns digest", func(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + "bar": "baz", + })) + expect := digest.SHA256.FromString(d.String()) + + g.Expect(d.Digest(digest.SHA256)).To(Equal(expect)) + g.Expect(d.digests).To(HaveKeyWithValue(digest.SHA256, expect)) + }) + + t.Run("returns cached digest", func(t *testing.T) { + g := NewWithT(t) + + d := &Digester{ + index: map[string]string{ + "foo": "bar", + "bar": "baz", + }, + digests: map[digest.Algorithm]digest.Digest{ + digest.SHA256: "sha256:foo", + }, + } + + g.Expect(d.Digest(digest.SHA256)).To(Equal(d.digests[digest.SHA256])) + }) +} + +func TestDigester_Verify(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "foo": "bar", + })) + + g.Expect(d.Verify(d.Digest(digest.SHA256))).To(BeTrue()) + g.Expect(d.Verify(digest.SHA256.FromString("different"))).To(BeFalse()) +} + +func TestDigester_sortedKeys(t *testing.T) { + g := NewWithT(t) + + d := NewDigester(WithIndex(map[string]string{ + "c/d/e": "bar", + "a/b/c": "baz", + "f/g/h": "foo", + })) + + g.Expect(d.sortedKeys()).To(Equal([]string{ + "a/b/c", + "c/d/e", + "f/g/h", + })) +} + +func TestDigester_reset(t *testing.T) { + g := NewWithT(t) + + d := NewDigester() + d.digests = map[digest.Algorithm]digest.Digest{ + digest.SHA256: "sha256:foo", + } + + d.reset() + g.Expect(d.digests).To(BeEmpty()) +} + +func Test_writeLine(t *testing.T) { + t.Run("writes", func(t *testing.T) { + g := NewWithT(t) + + var buf bytes.Buffer + n, err := writeLine(&buf, "foo", "bar") + + g.Expect(n).To(Equal(8)) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(buf.String()).To(Equal(`foo bar +`)) + }) + + t.Run("errors", func(t *testing.T) { + g := NewWithT(t) + + w := &fakeWriter{ + err: errors.New("write error"), + written: 5, + } + n, err := writeLine(w, "foo", "bar") + + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.Is(err, w.err)).To(BeTrue()) + g.Expect(n).To(Equal(w.written)) + }) +} + +type fakeWriter struct { + written int + err error +} + +func (f *fakeWriter) Write(p []byte) (n int, err error) { + return f.written, f.err +}