From ed6c6ebc3c9fb3f40029e492f1c3ef5d191a65ba Mon Sep 17 00:00:00 2001 From: pa250194 Date: Tue, 19 Oct 2021 12:13:37 -0500 Subject: [PATCH] Introduce BucketProvider interface This commit introduces a BucketProvider interface for fetch operations against object storage provider buckets. Allowing for easier introduction of new provider implementations. The algorithm for conditionally downloading object files is the same, whether you are using GCP storage or an S3/Minio-compatible bucket. The only thing that differs is how the respective clients handle enumerating through the objects in the bucket; by implementing just that in each provider, I can have the select-and-fetch code in once place. The client implementations do now include safe-guards to ensure the fetched object is the same as metadata has been collected for. In addition, minor changes have been made to the object fetch operation to take into account that: - Etags can change between composition of index and actual fetch, in which case the etag is now updated. - Objects can disappear between composition of index and actual fetch, in which case the item is removed from the index. Lastly, the requirement for authentication has been removed (and not referring to a Secret at all is thus allowed), to provide support for e.g. public buckets. Co-authored-by: Hidde Beydals Co-authored by: Michael Bridgen Signed-off-by: pa250194 --- controllers/bucket_controller.go | 691 +++++++++----------- controllers/bucket_controller_fetch_test.go | 322 +++++++++ controllers/bucket_controller_test.go | 594 +++++------------ go.mod | 2 +- internal/mock/gcs/server.go | 220 +++++++ internal/mock/s3/server.go | 157 +++++ pkg/gcp/gcp.go | 146 +++-- pkg/gcp/gcp_test.go | 161 +++-- pkg/minio/minio.go | 135 ++++ pkg/minio/minio_test.go | 283 ++++++++ 10 files changed, 1771 insertions(+), 940 deletions(-) create mode 100644 controllers/bucket_controller_fetch_test.go create mode 100644 internal/mock/gcs/server.go create mode 100644 internal/mock/s3/server.go create mode 100644 pkg/minio/minio.go create mode 100644 pkg/minio/minio_test.go diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 324cf46e..6ea57b81 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,17 +25,11 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" - gcpstorage "cloud.google.com/go/storage" - "github.com/fluxcd/pkg/runtime/events" - "github.com/fluxcd/source-controller/pkg/gcp" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/s3utils" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" - "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -49,6 +43,7 @@ import ( "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -56,9 +51,23 @@ import ( serror "github.com/fluxcd/source-controller/internal/error" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" + "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/minio" "github.com/fluxcd/source-controller/pkg/sourceignore" ) +// maxConcurrentBucketFetches is the upper bound on the goroutines used to +// fetch bucket objects. It's important to have a bound, to avoid +// using arbitrary amounts of memory; the actual number is chosen +// according to the queueing rule of thumb with some conservative +// parameters: +// s > Nr / T +// N (number of requestors, i.e., objects to fetch) = 10000 +// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) +// T (total time available) = 1s +// -> s > 100 +const maxConcurrentBucketFetches = 100 + // bucketReadyConditions contains all the conditions information needed // for Bucket Ready status conditions summary calculation. var bucketReadyConditions = summarize.Conditions{ @@ -103,9 +112,107 @@ type BucketReconcilerOptions struct { MaxConcurrentReconciles int } +// BucketProvider is an interface for fetching objects from a storage provider +// bucket. +type BucketProvider interface { + // BucketExists returns if an object storage bucket with the provided name + // exists, or returns a (client) error. + BucketExists(ctx context.Context, bucketName string) (bool, error) + // FGetObject gets the object from the provided object storage bucket, and + // writes it to targetPath. + // It returns the etag of the successfully fetched file, or any error. + FGetObject(ctx context.Context, bucketName, objectKey, targetPath string) (etag string, err error) + // VisitObjects iterates over the items in the provided object storage + // bucket, calling visit for every item. + // If the underlying client or the visit callback returns an error, + // it returns early. + VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error + // ObjectIsNotFound returns true if the given error indicates an object + // could not be found. + ObjectIsNotFound(error) bool + // Close closes the provider's client, if supported. + Close(context.Context) +} + // bucketReconcilerFunc is the function type for all the bucket reconciler // functions. -type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) +type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) + +// etagIndex is an index of storage object keys and their Etag values. +type etagIndex struct { + sync.RWMutex + index map[string]string +} + +// newEtagIndex returns a new etagIndex with an empty initialized index. +func newEtagIndex() *etagIndex { + return &etagIndex{ + index: make(map[string]string), + } +} + +func (i *etagIndex) Add(key, etag string) { + i.Lock() + defer i.Unlock() + i.index[key] = etag +} + +func (i *etagIndex) Delete(key string) { + i.Lock() + defer i.Unlock() + delete(i.index, key) +} + +func (i *etagIndex) Get(key string) string { + i.RLock() + defer i.RUnlock() + return i.index[key] +} + +func (i *etagIndex) Has(key string) bool { + i.RLock() + defer i.RUnlock() + _, ok := i.index[key] + return ok +} + +func (i *etagIndex) Index() map[string]string { + i.RLock() + defer i.RUnlock() + index := make(map[string]string) + for k, v := range i.index { + index[k] = v + } + return index +} + +func (i *etagIndex) Len() int { + i.RLock() + defer i.RUnlock() + return len(i.index) +} + +// Revision calculates the SHA256 checksum of the index. +// The keys are stable sorted, and the SHA256 sum is then calculated for the +// string representation of the key/value pairs, each pair written on a newline +// with a space between them. The sum result is returned as a string. +func (i *etagIndex) Revision() (string, error) { + i.RLock() + defer i.RUnlock() + keyIndex := make([]string, 0, len(i.index)) + for k := range i.index { + keyIndex = append(keyIndex, k) + } + + sort.Strings(keyIndex) + sum := sha256.New() + for _, k := range keyIndex { + if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil { + return "", err + } + } + return fmt.Sprintf("%x", sum.Sum(nil)), nil +} func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) @@ -201,9 +308,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } - index := make(etagIndex) - var artifact sourcev1.Artifact - // Create temp working dir tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name)) if err != nil { @@ -215,10 +319,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, defer os.RemoveAll(tmpDir) // Run the sub-reconcilers and build the result of reconciliation. - var res sreconcile.Result - var resErr error + var ( + res sreconcile.Result + resErr error + index = newEtagIndex() + ) + for _, rec := range reconcilers { - recResult, err := rec(ctx, obj, index, &artifact, tmpDir) + recResult, err := rec(ctx, obj, index, tmpDir) // Exit immediately on ResultRequeue. if recResult == sreconcile.ResultRequeue { return sreconcile.ResultRequeue, nil @@ -241,8 +349,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, // All artifacts for the resource except for the current one are garbage collected from the storage. // If the artifact in the Status object of the resource disappeared from storage, it is removed from the object. // If the hostname of the URLs on the object do not match the current storage server hostname, they are updated. -func (r *BucketReconciler) reconcileStorage(ctx context.Context, - obj *sourcev1.Bucket, _ etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage _ = r.garbageCollect(ctx, obj) @@ -270,335 +377,84 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, // result. // If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret // fails, it records v1beta1.FetchFailedCondition=True and returns early. -func (r *BucketReconciler) reconcileSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { - var secret *corev1.Secret - if obj.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.Spec.SecretRef.Name, - } - secret = &corev1.Secret{} - if err := r.Get(ctx, secretName, secret); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err), - Reason: sourcev1.AuthenticationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, e.Err.Error()) - // Return error as the world as observed may change - return sreconcile.ResultEmpty, e - } +func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { + secret, err := r.getBucketSecret(ctx, obj) + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + // Return error as the world as observed may change + return sreconcile.ResultEmpty, e } + // Construct provider client + var provider BucketProvider switch obj.Spec.Provider { case sourcev1.GoogleBucketProvider: - return r.reconcileGCPSource(ctx, obj, index, artifact, secret, dir) + if err = gcp.ValidateSecret(secret); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + if provider, err = gcp.NewClient(ctx, secret); err != nil { + e := &serror.Event{Err: err, Reason: "ClientError"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } default: - return r.reconcileMinioSource(ctx, obj, index, artifact, secret, dir) - } -} - -// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the -// declared configuration, and observes its state. -// -// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into -// account. In case of an error during the download process (including transient errors), it records -// v1beta1.FetchFailedCondition=True and returns early. -// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to -// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. -// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. -func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) { - // Build the client with the configuration from the object and secret - s3Client, err := r.buildMinioClient(obj, secret) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to construct S3 client: %w", err), - Reason: sourcev1.BucketOperationFailedReason, + if err = minio.ValidateSecret(secret); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - // Return error as the contents of the secret may change - return sreconcile.ResultEmpty, e - } - - // Confirm bucket exists - ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) - defer cancel() - exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - if !exists { - e := &serror.Event{ - Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Look for file with ignore rules first - path := filepath.Join(dir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { - e := &serror.Event{ - Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + if provider, err = minio.NewClient(obj, secret); err != nil { + e := &serror.Event{Err: err, Reason: "ClientError"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + + // Fetch etag index + if err = fetchEtagIndex(ctx, provider, obj, index, dir); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } - // In-spec patterns take precedence - if obj.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - // Build up an index of object keys and their etags - // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to - // detect both structural and file changes - for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if err = object.Err; err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Ignore directories and the .sourceignore file - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - // Ignore matches - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - index[object.Key] = object.ETag - } - - // Calculate revision checksum from the collected index values + // Calculate revision revision, err := index.Revision() if err != nil { - ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision") return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to calculate revision: %w", err), Reason: meta.FailedReason, } } - if !obj.GetArtifact().HasRevision(revision) { - // Mark observations about the revision on the object - message := fmt.Sprintf("new upstream revision '%s'", revision) - conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) - conditions.MarkReconciling(obj, "NewRevision", message) - - // Download the files in parallel, but with a limited number of workers - group, groupCtx := errgroup.WithContext(ctx) - group.Go(func() error { - const workers = 4 - sem := semaphore.NewWeighted(workers) - for key := range index { - k := key - if err := sem.Acquire(groupCtx, 1); err != nil { - return err - } - group.Go(func() error { - defer sem.Release(1) - localPath := filepath.Join(dir, k) - if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil { - return fmt.Errorf("failed to get '%s' file: %w", k, err) - } - return nil - }) - } - return nil - }) - if err = group.Wait(); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - } - conditions.Delete(obj, sourcev1.FetchFailedCondition) - - // Create potential new artifact - *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) - return sreconcile.ResultSuccess, nil -} - -// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the -// declared configuration, and observes its state. -// -// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into -// account. In case of an error during the download process (including transient errors), it records -// v1beta1.DownloadFailedCondition=True and returns early. -// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to -// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. -// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. -func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) { - gcpClient, err := r.buildGCPClient(ctx, secret) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to construct GCP client: %w", err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - // Return error as the contents of the secret may change - return sreconcile.ResultEmpty, e - } - defer gcpClient.Close(ctrl.LoggerFrom(ctx)) - - // Confirm bucket exists - ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) - defer cancel() - exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - if !exists { - e := &serror.Event{ - Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Look for file with ignore rules first - path := filepath.Join(dir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err != gcpstorage.ErrObjectNotExist { - e := &serror.Event{ - Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - // In-spec patterns take precedence - if obj.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - - // Build up an index of object keys and their etags - // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to - // detect both structural and file changes - objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil) - for { - object, err := objects.Next() + // Mark observations about the revision on the object + defer func() { + // As fetchIndexFiles can make last-minute modifications to the etag + // index, we need to re-calculate the revision at the end + revision, err := index.Revision() if err != nil { - if err == gcp.IteratorDone { - break - } - e := &serror.Event{ - Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e + ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index") + return } - if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { - continue + if !obj.GetArtifact().HasRevision(revision) { + message := fmt.Sprintf("new upstream revision '%s'", revision) + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) + conditions.MarkReconciling(obj, "NewRevision", message) } - - if matcher.Match(strings.Split(object.Name, "/"), false) { - continue - } - - index[object.Name] = object.Etag - } - - // Calculate revision checksum from the collected index values - revision, err := index.Revision() - if err != nil { - return sreconcile.ResultEmpty, &serror.Event{ - Err: fmt.Errorf("failed to calculate revision: %w", err), - Reason: meta.FailedReason, - } - } + }() if !obj.GetArtifact().HasRevision(revision) { - // Mark observations about the revision on the object - message := fmt.Sprintf("new upstream revision '%s'", revision) - conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) - conditions.MarkReconciling(obj, "NewRevision", message) - - // Download the files in parallel, but with a limited number of workers - group, groupCtx := errgroup.WithContext(ctx) - group.Go(func() error { - const workers = 4 - sem := semaphore.NewWeighted(workers) - for key := range index { - k := key - if err := sem.Acquire(groupCtx, 1); err != nil { - return err - } - group.Go(func() error { - defer sem.Release(1) - localPath := filepath.Join(dir, k) - if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil { - return fmt.Errorf("failed to get '%s' file: %w", k, err) - } - return nil - }) - } - return nil - }) - if err = group.Wait(); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } } - conditions.Delete(obj, sourcev1.FetchFailedCondition) - // Create potential new artifact - *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + conditions.Delete(obj, sourcev1.FetchFailedCondition) return sreconcile.ResultSuccess, nil } @@ -609,8 +465,19 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, // If the given artifact does not differ from the object's current, it returns early. // On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is // updated to its path. -func (r *BucketReconciler) reconcileArtifact(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { + // Calculate revision + revision, err := index.Revision() + if err != nil { + return sreconcile.ResultEmpty, &serror.Event{ + Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err), + Reason: meta.FailedReason, + } + } + + // Create artifact + artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + // Always restore the Ready condition in case it got removed due to a transient error defer func() { if obj.GetArtifact().HasRevision(artifact.Revision) { @@ -640,13 +507,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, } // Ensure artifact directory exists and acquire lock - if err := r.Storage.MkdirAll(*artifact); err != nil { + if err := r.Storage.MkdirAll(artifact); err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to create artifact directory: %w", err), Reason: sourcev1.StorageOperationFailedReason, } } - unlock, err := r.Storage.Lock(*artifact) + unlock, err := r.Storage.Lock(artifact) if err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to acquire lock for artifact: %w", err), @@ -656,7 +523,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, defer unlock() // Archive directory to storage - if err := r.Storage.Archive(artifact, dir, nil); err != nil { + if err := r.Storage.Archive(&artifact, dir, nil); err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("unable to archive artifact to storage: %s", err), Reason: sourcev1.StorageOperationFailedReason, @@ -665,13 +532,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, r.annotatedEventLogf(ctx, obj, map[string]string{ "revision": artifact.Revision, "checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", len(index), obj.Spec.BucketName) + }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() // Update symlink on a "best effort" basis - url, err := r.Storage.Symlink(*artifact, "latest.tar.gz") + url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, "failed to update status URL symlink: %s", err) @@ -729,74 +596,21 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } -// buildMinioClient constructs a minio.Client with the data from the given object and Secret. -// It returns an error if the Secret does not have the required fields, or if there is no credential handler -// configured. -func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) { - opts := minio.Options{ - Region: obj.Spec.Region, - Secure: !obj.Spec.Insecure, +// getBucketSecret attempts to fetch the Secret reference if specified on the +// obj. It returns any client error. +func (r *BucketReconciler) getBucketSecret(ctx context.Context, obj *sourcev1.Bucket) (*corev1.Secret, error) { + if obj.Spec.SecretRef == nil { + return nil, nil } - if secret != nil { - var accessKey, secretKey string - if k, ok := secret.Data["accesskey"]; ok { - accessKey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretKey = string(k) - } - if accessKey == "" || secretKey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "") - } else if obj.Spec.Provider == sourcev1.AmazonBucketProvider { - opts.Creds = credentials.NewIAM("") + secretName := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.Spec.SecretRef.Name, } - return minio.New(obj.Spec.Endpoint, &opts) -} - -// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret. -// It returns an error if the Secret does not have the required field, or if the client construction fails. -func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) { - var client *gcp.GCPClient - var err error - if secret != nil { - if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { - return nil, err - } - client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - } else { - client, err = gcp.NewClient(ctx) - if err != nil { - return nil, err - } + secret := &corev1.Secret{} + if err := r.Get(ctx, secretName, secret); err != nil { + return nil, fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err) } - return client, nil -} - -// etagIndex is an index of bucket keys and their Etag values. -type etagIndex map[string]string - -// Revision calculates the SHA256 checksum of the index. -// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of -// the key/value pairs, each pair written on a newline -// The sum result is returned as a string. -func (i etagIndex) Revision() (string, error) { - keyIndex := make([]string, 0, len(i)) - for k := range i { - keyIndex = append(keyIndex, k) - } - sort.Strings(keyIndex) - sum := sha256.New() - for _, k := range keyIndex { - if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil { - return "", err - } - } - return fmt.Sprintf("%x", sum.Sum(nil)), nil + return secret, nil } // eventLogf records event and logs at the same time. @@ -819,3 +633,106 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context, } r.AnnotatedEventf(obj, annotations, eventType, reason, msg) } + +// fetchEtagIndex fetches the current etagIndex for the in the obj specified +// bucket using the given provider, while filtering them using .sourceignore +// rules. After fetching an object, the etag value in the index is updated to +// the current value to ensure accuracy. +func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + + // Confirm bucket exists + exists, err := provider.BucketExists(ctxTimeout, obj.Spec.BucketName) + if err != nil { + return fmt.Errorf("failed to confirm existence of '%s' bucket: %w", obj.Spec.BucketName, err) + } + if !exists { + err = fmt.Errorf("bucket '%s' not found", obj.Spec.BucketName) + return err + } + + // Look for file with ignore rules first + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + if _, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { + if !provider.ObjectIsNotFound(err) { + return err + } + } + ps, err := sourceignore.ReadIgnoreFile(path, nil) + if err != nil { + return err + } + // In-spec patterns take precedence + if obj.Spec.Ignore != nil { + ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) + } + matcher := sourceignore.NewMatcher(ps) + + // Build up index + err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, func(key, etag string) error { + if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile { + return nil + } + + if matcher.Match(strings.Split(key, "/"), false) { + return nil + } + + index.Add(key, etag) + return nil + }) + if err != nil { + return fmt.Errorf("indexation of objects from bucket '%s' failed: %w", obj.Spec.BucketName, err) + } + return nil +} + +// fetchIndexFiles fetches the object files for the keys from the given etagIndex +// using the given provider, and stores them into tempDir. It downloads in +// parallel, but limited to the maxConcurrentBucketFetches. +// Given an index is provided, the bucket is assumed to exist. +func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + + // Download in parallel, but bound the concurrency. According to + // AWS and GCP docs, rate limits are either soft or don't exist: + // - https://cloud.google.com/storage/quotas + // - https://docs.aws.amazon.com/general/latest/gr/s3.html + // .. so, the limiting factor is this process keeping a small footprint. + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + sem := semaphore.NewWeighted(maxConcurrentBucketFetches) + for key, etag := range index.Index() { + k := key + t := etag + if err := sem.Acquire(groupCtx, 1); err != nil { + return err + } + group.Go(func() error { + defer sem.Release(1) + localPath := filepath.Join(tempDir, k) + etag, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath) + if err != nil { + if provider.ObjectIsNotFound(err) { + ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("indexed object '%s' disappeared from '%s' bucket", k, obj.Spec.BucketName)) + index.Delete(k) + return nil + } + return fmt.Errorf("failed to get '%s' object: %w", k, err) + } + if t != etag { + index.Add(k, etag) + } + return nil + }) + } + return nil + }) + if err := group.Wait(); err != nil { + return fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err) + } + + return nil +} diff --git a/controllers/bucket_controller_fetch_test.go b/controllers/bucket_controller_fetch_test.go new file mode 100644 index 00000000..acaa7e74 --- /dev/null +++ b/controllers/bucket_controller_fetch_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "gotest.tools/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" +) + +type mockBucketObject struct { + etag string + data string +} + +type mockBucketClient struct { + bucketName string + objects map[string]mockBucketObject +} + +var mockNotFound = fmt.Errorf("not found") + +func (m mockBucketClient) BucketExists(_ context.Context, name string) (bool, error) { + return name == m.bucketName, nil +} + +func (m mockBucketClient) FGetObject(_ context.Context, bucket, obj, path string) (string, error) { + if bucket != m.bucketName { + return "", fmt.Errorf("bucket does not exist") + } + // tiny bit of protocol, for convenience: if asked for an object "error", then return an error. + if obj == "error" { + return "", fmt.Errorf("I was asked to report an error") + } + object, ok := m.objects[obj] + if !ok { + return "", mockNotFound + } + if err := os.WriteFile(path, []byte(object.data), os.FileMode(0660)); err != nil { + return "", err + } + return object.etag, nil +} + +func (m mockBucketClient) ObjectIsNotFound(e error) bool { + return e == mockNotFound +} + +func (m mockBucketClient) VisitObjects(_ context.Context, _ string, f func(key, etag string) error) error { + for key, obj := range m.objects { + if err := f(key, obj.etag); err != nil { + return err + } + } + return nil +} + +func (m mockBucketClient) Close(_ context.Context) { + return +} + +func (m *mockBucketClient) addObject(key string, object mockBucketObject) { + if m.objects == nil { + m.objects = make(map[string]mockBucketObject) + } + m.objects[key] = object +} + +func (m *mockBucketClient) objectsToEtagIndex() *etagIndex { + i := newEtagIndex() + for k, v := range m.objects { + i.Add(k, v.etag) + } + return i +} + +func Test_fetchEtagIndex(t *testing.T) { + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + + t.Run("fetches etag index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) + client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, index.Len(), 3) + }) + + t.Run("an error while bucket does not exist", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: "other-bucket-name"} + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + assert.ErrorContains(t, err, "not found") + }) + + t.Run("filters with .sourceignore rules", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`}) + client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"}) + client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"}) + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil { + t.Error(err) + } + + if ok := index.Has("foo.txt"); ok { + t.Error(fmt.Errorf("expected 'foo.txt' index item to not exist")) + } + assert.Equal(t, index.Len(), 1) + }) + + t.Run("filters with ignore rules from object", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`}) + client.addObject("foo.txt", mockBucketObject{etag: "etag1", data: "foo.txt"}) + + ignore := "!*.txt" + bucket := bucket.DeepCopy() + bucket.Spec.Ignore = &ignore + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil { + t.Error(err) + } + + assert.Equal(t, index.Len(), 1) + if ok := index.Has("foo.txt"); !ok { + t.Error(fmt.Errorf("expected 'foo.txt' index item to exist")) + } + }) +} + +func Test_fetchFiles(t *testing.T) { + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + + t.Run("fetches files", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) + client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) + + index := client.objectsToEtagIndex() + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + for path := range index.Index() { + p := filepath.Join(tmp, path) + _, err := os.Stat(p) + if err != nil { + t.Error(err) + } + } + }) + + t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}} + client.objects["error"] = mockBucketObject{} + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp) + if err == nil { + t.Fatal("expected error but got nil") + } + }) + + t.Run("a changed etag updates the index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"}) + + index := newEtagIndex() + index.Add("foo.yaml", "etag1") + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + f := index.Get("foo.yaml") + assert.Equal(t, f, "etag2") + }) + + t.Run("a disappeared index entry is removed from the index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + + index := newEtagIndex() + index.Add("foo.yaml", "etag1") + // Does not exist on server + index.Add("bar.yaml", "etag2") + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + f := index.Get("foo.yaml") + assert.Equal(t, f, "etag1") + assert.Check(t, !index.Has("bar.yaml")) + }) + + t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) { + // this will fail if, for example, the semaphore is not used correctly and blocks + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + for i := 0; i < 2*maxConcurrentBucketFetches; i++ { + f := fmt.Sprintf("file-%d", i) + client.addObject(f, mockBucketObject{etag: f, data: f}) + } + index := client.objectsToEtagIndex() + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + }) +} diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 3ff729f3..060b6e12 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -18,14 +18,10 @@ package controllers import ( "context" - "crypto/md5" - "encoding/json" "fmt" "net/http" - "net/http/httptest" "net/url" "os" - "path" "path/filepath" "strings" "testing" @@ -36,7 +32,6 @@ import ( "github.com/fluxcd/pkg/runtime/conditions" "github.com/fluxcd/pkg/runtime/patch" . "github.com/onsi/gomega" - raw "google.golang.org/api/storage/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,17 +41,19 @@ import ( fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs" + s3mock "github.com/fluxcd/source-controller/internal/mock/s3" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" ) // Environment variable to set the GCP Storage host for the GCP client. -const ENV_GCP_STORAGE_HOST = "STORAGE_EMULATOR_HOST" +const EnvGcpStorageHost = "STORAGE_EMULATOR_HOST" func TestBucketReconciler_Reconcile(t *testing.T) { g := NewWithT(t) - s3Server := newS3Server("test-bucket") - s3Server.Objects = []*s3MockObject{ + s3Server := s3mock.NewServer("test-bucket") + s3Server.Objects = []*s3mock.Object{ { Key: "test.yaml", Content: []byte("test"), @@ -274,10 +271,9 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) } - index := make(etagIndex) - var artifact sourcev1.Artifact + index := newEtagIndex() - got, err := r.reconcileStorage(context.TODO(), obj, index, &artifact, "") + got, err := r.reconcileStorage(context.TODO(), obj, index, "") g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) @@ -299,23 +295,23 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { } } -func TestBucketReconciler_reconcileMinioSource(t *testing.T) { +func TestBucketReconciler_reconcileSource_generic(t *testing.T) { tests := []struct { name string bucketName string - bucketObjects []*s3MockObject + bucketObjects []*s3mock.Object middleware http.Handler secret *corev1.Secret beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertArtifact sourcev1.Artifact + assertIndex *etagIndex assertConditions []metav1.Condition }{ { - name: "reconciles source", + name: "Reconciles GCS source", bucketName: "dummy", - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -324,13 +320,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO(hidde): middleware for mock server @@ -339,20 +336,21 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { // bucketName: "dummy", //}, { - name: "observes non-existing secretRef", + name: "Observes non-existing secretRef", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), }, }, { - name: "observes invalid secretRef", + name: "Observes invalid secretRef", bucketName: "dummy", secret: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -364,38 +362,40 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { Name: "dummy", } }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct S3 client: invalid 'dummy' secret data: required fields"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields 'accesskey' and 'secretkey'"), }, }, { - name: "observes non-existing bucket name", + name: "Observes non-existing bucket name", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), }, }, { - name: "transient bucket name API failure", + name: "Transient bucket name API failure", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), }, }, { - // TODO(hidde): test the lesser happy paths name: ".sourceignore", bucketName: "dummy", - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), @@ -416,23 +416,24 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz", - Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100", + assertIndex: &etagIndex{ + index: map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { name: "spec.ignore overrides .sourceignore", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { - ignore := "included/file.txt" + ignore := "!ignored/file.txt" obj.Spec.Ignore = &ignore }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), @@ -453,24 +454,26 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz", - Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + assertIndex: &etagIndex{ + index: map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { - name: "up-to-date artifact", + name: "Up-to-date artifact", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &sourcev1.Artifact{ - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -479,9 +482,10 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{}, }, @@ -491,7 +495,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file") }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -500,13 +504,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, } @@ -539,9 +544,9 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, } - var server *s3MockServer + var server *s3mock.Server if tt.bucketName != "" { - server = newS3Server(tt.bucketName) + server = s3mock.NewServer(tt.bucketName) server.Objects = tt.bucketObjects server.Start() defer server.Stop() @@ -559,38 +564,39 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { tt.beforeFunc(obj) } - artifact := &sourcev1.Artifact{} - index := make(etagIndex) - got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir) + index := newEtagIndex() + + got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) - g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) }) } } -func TestBucketReconciler_reconcileGCPSource(t *testing.T) { +func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { tests := []struct { name string bucketName string - bucketObjects []*gcpMockObject + bucketObjects []*gcsmock.Object secret *corev1.Secret beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertArtifact sourcev1.Artifact + assertIndex *etagIndex assertConditions []metav1.Condition }{ { - name: "reconciles source", + name: "Reconciles GCS source", bucketName: "dummy", - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", ContentType: "text/plain", Content: []byte("test"), + Generation: 3, }, }, secret: &corev1.Secret{ @@ -609,31 +615,33 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { - name: "observes non-existing secretRef", + name: "Observes non-existing secretRef", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), }, }, { - name: "observes invalid secretRef", + name: "Observes invalid secretRef", bucketName: "dummy", secret: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -645,119 +653,133 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { Name: "dummy", } }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct GCP client: invalid 'dummy' secret data: required fields"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields"), }, }, { - name: "observes non-existing bucket name", + name: "Observes non-existing bucket name", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), }, }, { - name: "transient bucket name API failure", + name: "Transient bucket name API failure", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), }, }, { name: ".sourceignore", bucketName: "dummy", - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 1, }, { Key: "ignored/file.txt", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 4, }, { Key: "included/file.txt", Content: []byte("included/file.txt"), ContentType: "text/plain", + Generation: 3, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4.tar.gz", - Revision: "7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4", + assertIndex: &etagIndex{ + index: map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { name: "spec.ignore overrides .sourceignore", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { - ignore := "included/file.txt" + ignore := "!ignored/file.txt" obj.Spec.Ignore = &ignore }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 1, }, { Key: "ignored/file.txt", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 2, }, { Key: "included/file.txt", Content: []byte("included/file.txt"), ContentType: "text/plain", + Generation: 4, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz", - Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + assertIndex: &etagIndex{ + index: map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { - name: "up-to-date artifact", + name: "Up-to-date artifact", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &sourcev1.Artifact{ - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", Content: []byte("test"), ContentType: "text/plain", + Generation: 2, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{}, }, @@ -767,21 +789,23 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file") }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", Content: []byte("test"), ContentType: "text/plain", + Generation: 2, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO: Middleware for mock server to test authentication using secret. @@ -819,7 +843,7 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } // Set up the mock GCP bucket server. - server := newGCPServer(tt.bucketName) + server := gcsmock.NewServer(tt.bucketName) server.Objects = tt.bucketObjects server.Start() defer server.Stop() @@ -834,31 +858,28 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } // Set the GCP storage host to be used by the GCP client. - g.Expect(os.Setenv(ENV_GCP_STORAGE_HOST, obj.Spec.Endpoint)).ToNot(HaveOccurred()) + g.Expect(os.Setenv(EnvGcpStorageHost, obj.Spec.Endpoint)).ToNot(HaveOccurred()) defer func() { - g.Expect(os.Unsetenv(ENV_GCP_STORAGE_HOST)).ToNot(HaveOccurred()) + g.Expect(os.Unsetenv(EnvGcpStorageHost)).ToNot(HaveOccurred()) }() - artifact := &sourcev1.Artifact{} - index := make(etagIndex) - got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir) + index := newEtagIndex() + + got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) + t.Log(err) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) - g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) }) } } func TestBucketReconciler_reconcileArtifact(t *testing.T) { - // testChecksum is the checksum value of the artifacts created in this - // test. - const testChecksum = "4f4fb700ef54461cfa02571ae0db9a0dc1e0cdb5577484a6d75e68dc38e8acc1" - tests := []struct { name string - beforeFunc func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) + beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) afterFunc func(t *WithT, obj *sourcev1.Bucket, dir string) want sreconcile.Result wantErr bool @@ -866,42 +887,45 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }{ { name: "Archiving artifact to storage makes Ready=True", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { - name: "Up-to-date artifact should not update status", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + name: "Up-to-date artifact should not persist and update status", + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + revision, _ := index.Revision() obj.Spec.Interval = metav1.Duration{Duration: interval} - obj.Status.Artifact = artifact.DeepCopy() + // Incomplete artifact + obj.Status.Artifact = &sourcev1.Artifact{Revision: revision} }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { + // Still incomplete t.Expect(obj.Status.URL).To(BeEmpty()) }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Removes ArtifactOutdatedCondition after creating a new artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "") }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Creates latest symlink to the created artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { @@ -913,12 +937,12 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Dir path deleted", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) }, want: sreconcile.ResultEmpty, @@ -926,7 +950,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, { name: "Dir path is not a directory", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { // Remove the given directory and create a file for the same // path. t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) @@ -969,23 +993,18 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, } - index := make(etagIndex) - artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz") - artifact.Checksum = testChecksum + index := newEtagIndex() if tt.beforeFunc != nil { - tt.beforeFunc(g, obj, artifact, tmpDir) + tt.beforeFunc(g, obj, index, tmpDir) } - got, err := r.reconcileArtifact(context.TODO(), obj, index, &artifact, tmpDir) + got, err := r.reconcileArtifact(context.TODO(), obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) // On error, artifact is empty. Check artifacts only on successful // reconcile. - if !tt.wantErr { - g.Expect(obj.Status.Artifact).To(MatchArtifact(artifact.DeepCopy())) - } g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) if tt.afterFunc != nil { @@ -998,7 +1017,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { func Test_etagIndex_Revision(t *testing.T) { tests := []struct { name string - list etagIndex + list map[string]string want string wantErr bool }{ @@ -1009,7 +1028,7 @@ func Test_etagIndex_Revision(t *testing.T) { "two": "two", "three": "three", }, - want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", }, { name: "index with items in different order", @@ -1018,7 +1037,7 @@ func Test_etagIndex_Revision(t *testing.T) { "one": "one", "two": "two", }, - want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", }, { name: "empty index", @@ -1033,7 +1052,8 @@ func Test_etagIndex_Revision(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := tt.list.Revision() + index := &etagIndex{index: tt.list} + got, err := index.Revision() if (err != nil) != tt.wantErr { t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr) return @@ -1044,277 +1064,3 @@ func Test_etagIndex_Revision(t *testing.T) { }) } } - -// helpers - -func mockFile(root, path, content string) error { - filePath := filepath.Join(root, path) - if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { - panic(err) - } - if err := os.WriteFile(filePath, []byte(content), 0644); err != nil { - panic(err) - } - return nil -} - -type s3MockObject struct { - Key string - LastModified time.Time - ContentType string - Content []byte -} - -type s3MockServer struct { - srv *httptest.Server - mux *http.ServeMux - - BucketName string - Objects []*s3MockObject -} - -func newS3Server(bucketName string) *s3MockServer { - s := &s3MockServer{BucketName: bucketName} - s.mux = http.NewServeMux() - s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler)) - - s.srv = httptest.NewUnstartedServer(s.mux) - - return s -} - -func (s *s3MockServer) Start() { - s.srv.Start() -} - -func (s *s3MockServer) Stop() { - s.srv.Close() -} - -func (s *s3MockServer) HTTPAddress() string { - return s.srv.URL -} - -func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) { - key := path.Base(r.URL.Path) - - switch key { - case s.BucketName: - w.Header().Add("Content-Type", "application/xml") - - if r.Method == http.MethodHead { - return - } - - q := r.URL.Query() - - if q["location"] != nil { - fmt.Fprint(w, ` - -Europe - `) - return - } - - contents := "" - for _, o := range s.Objects { - etag := md5.Sum(o.Content) - contents += fmt.Sprintf(` - - %s - %s - %d - "%b" - STANDARD - `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) - } - - fmt.Fprintf(w, ` - - - %s - - - %d - 1000 - false - %s - - `, s.BucketName, len(s.Objects), contents) - default: - key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) - if err != nil { - w.WriteHeader(500) - return - } - - var found *s3MockObject - for _, o := range s.Objects { - if key == o.Key { - found = o - } - } - if found == nil { - w.WriteHeader(404) - return - } - - etag := md5.Sum(found.Content) - lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) - - w.Header().Add("Content-Type", found.ContentType) - w.Header().Add("Last-Modified", lastModified) - w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag)) - w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) - - if r.Method == http.MethodHead { - return - } - - w.Write(found.Content) - } -} - -type gcpMockObject struct { - Key string - ContentType string - Content []byte -} - -type gcpMockServer struct { - srv *httptest.Server - mux *http.ServeMux - - BucketName string - Etag string - Objects []*gcpMockObject - Close func() -} - -func newGCPServer(bucketName string) *gcpMockServer { - s := &gcpMockServer{BucketName: bucketName} - s.mux = http.NewServeMux() - s.mux.Handle("/", http.HandlerFunc(s.handler)) - - s.srv = httptest.NewUnstartedServer(s.mux) - - return s -} - -func (gs *gcpMockServer) Start() { - gs.srv.Start() -} - -func (gs *gcpMockServer) Stop() { - gs.srv.Close() -} - -func (gs *gcpMockServer) HTTPAddress() string { - return gs.srv.URL -} - -func (gs *gcpMockServer) GetAllObjects() *raw.Objects { - objs := &raw.Objects{} - for _, o := range gs.Objects { - objs.Items = append(objs.Items, getGCPObject(gs.BucketName, *o)) - } - return objs -} - -func (gs *gcpMockServer) GetObjectFile(key string) ([]byte, error) { - for _, o := range gs.Objects { - if o.Key == key { - return o.Content, nil - } - } - return nil, fmt.Errorf("not found") -} - -func (gs *gcpMockServer) handler(w http.ResponseWriter, r *http.Request) { - if strings.HasPrefix(r.RequestURI, "/b/") { - // Handle the bucket info related queries. - if r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", gs.BucketName) { - // Return info about the bucket. - response := getGCPBucket(gs.BucketName, gs.Etag) - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } else if strings.Contains(r.RequestURI, "/o/") { - // Return info about object in the bucket. - var obj *gcpMockObject - for _, o := range gs.Objects { - // The object key in the URI is escaped. - // e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full - if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", gs.BucketName, url.QueryEscape(o.Key)) { - obj = o - } - } - if obj != nil { - response := getGCPObject(gs.BucketName, *obj) - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } - w.WriteHeader(404) - return - } else if strings.Contains(r.RequestURI, "/o?") { - // Return info about all the objects in the bucket. - response := gs.GetAllObjects() - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } - w.WriteHeader(404) - return - } else { - // Handle object file query. - bucketPrefix := fmt.Sprintf("/%s/", gs.BucketName) - if strings.HasPrefix(r.RequestURI, bucketPrefix) { - // The URL path is of the format //included/file.txt. - // Extract the object key by discarding the bucket prefix. - key := strings.TrimPrefix(r.URL.Path, bucketPrefix) - // Handle returning object file in a bucket. - response, err := gs.GetObjectFile(key) - if err != nil { - w.WriteHeader(404) - return - } - w.WriteHeader(200) - w.Write(response) - return - } - w.WriteHeader(404) - return - } -} - -func getGCPObject(bucket string, obj gcpMockObject) *raw.Object { - return &raw.Object{ - Bucket: bucket, - Name: obj.Key, - ContentType: obj.ContentType, - } -} - -func getGCPBucket(name, eTag string) *raw.Bucket { - return &raw.Bucket{ - Name: name, - Location: "loc", - Etag: eTag, - } -} diff --git a/go.mod b/go.mod index 499fc12d..64879a7e 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/go-git/go-billy/v5 v5.3.1 github.com/go-git/go-git/v5 v5.4.2 github.com/go-logr/logr v1.2.2 + github.com/google/uuid v1.3.0 github.com/libgit2/git2go/v33 v33.0.6 github.com/minio/minio-go/v7 v7.0.15 github.com/onsi/gomega v1.17.0 @@ -97,7 +98,6 @@ require ( github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/internal/mock/gcs/server.go b/internal/mock/gcs/server.go new file mode 100644 index 00000000..b8b1cd92 --- /dev/null +++ b/internal/mock/gcs/server.go @@ -0,0 +1,220 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gcs + +import ( + "crypto/md5" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + + raw "google.golang.org/api/storage/v1" +) + +var ( + ObjectNotFound = errors.New("object not found") +) + +// Object is a mock Server object. +type Object struct { + Key string + Generation int64 + MetaGeneration int64 + ContentType string + Content []byte +} + +// Server is a simple Google Cloud Storage mock server. +// It serves the provided Objects for the BucketName on the HTTPAddress when +// Start or StartTLS is called. +// It provides primitive support "Generation Conditions" when Object contents +// are fetched. +// Ref: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions +type Server struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*Object +} + +func NewServer(bucketName string) *Server { + s := &Server{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle("/", http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *Server) Start() { + s.srv.Start() +} + +func (s *Server) StartTLS(config *tls.Config) { + s.srv.TLS = config + s.srv.StartTLS() +} + +func (s *Server) Stop() { + s.srv.Close() +} + +func (s *Server) HTTPAddress() string { + return s.srv.URL +} + +func (s *Server) getAllObjects() *raw.Objects { + objs := &raw.Objects{} + for _, o := range s.Objects { + objs.Items = append(objs.Items, getGCSObject(s.BucketName, *o)) + } + return objs +} + +func (s *Server) getObjectFile(key string, generation int64) ([]byte, error) { + for _, o := range s.Objects { + if o.Key == key { + if generation == 0 || generation == o.Generation { + return o.Content, nil + } + } + } + return nil, ObjectNotFound +} + +func (s *Server) handler(w http.ResponseWriter, r *http.Request) { + switch { + // Handle Bucket metadata related queries + case strings.HasPrefix(r.RequestURI, "/b/"): + switch { + // Return metadata about the Bucket + case r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", s.BucketName): + etag := md5.New() + for _, v := range s.Objects { + etag.Write(v.Content) + } + response := getGCSBucket(s.BucketName, fmt.Sprintf("%x", etag.Sum(nil))) + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + // Return metadata about a Bucket object + case strings.Contains(r.RequestURI, "/o/"): + var obj *Object + for _, o := range s.Objects { + // The object key in the URI is escaped. + // e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full + if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", s.BucketName, url.QueryEscape(o.Key)) { + obj = o + break + } + } + if obj != nil { + response := getGCSObject(s.BucketName, *obj) + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + } + w.WriteHeader(404) + return + // Return metadata about all objects in the Bucket + case strings.Contains(r.RequestURI, "/o?"): + response := s.getAllObjects() + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + default: + w.WriteHeader(404) + return + } + // Handle object file query + default: + bucketPrefix := fmt.Sprintf("/%s/", s.BucketName) + if strings.HasPrefix(r.RequestURI, bucketPrefix) { + // The URL path is of the format //included/file.txt. + // Extract the object key by discarding the bucket prefix. + key := strings.TrimPrefix(r.URL.Path, bucketPrefix) + + // Support "Generation Conditions" + // https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions + var generation int64 + if matchGeneration := r.URL.Query().Get("ifGenerationMatch"); matchGeneration != "" { + var err error + if generation, err = strconv.ParseInt(matchGeneration, 10, 64); err != nil { + w.WriteHeader(500) + return + } + } + + // Handle returning object file in a bucket. + response, err := s.getObjectFile(key, generation) + if err != nil { + w.WriteHeader(404) + return + } + w.WriteHeader(200) + w.Write(response) + return + } + w.WriteHeader(404) + return + } +} + +func getGCSObject(bucket string, obj Object) *raw.Object { + hash := md5.Sum(obj.Content) + etag := fmt.Sprintf("%x", hash) + return &raw.Object{ + Bucket: bucket, + Name: obj.Key, + ContentType: obj.ContentType, + Generation: obj.Generation, + Metageneration: obj.MetaGeneration, + Md5Hash: etag, + Etag: etag, + } +} + +func getGCSBucket(name, eTag string) *raw.Bucket { + return &raw.Bucket{ + Name: name, + Location: "loc", + Etag: eTag, + } +} diff --git a/internal/mock/s3/server.go b/internal/mock/s3/server.go new file mode 100644 index 00000000..904f1942 --- /dev/null +++ b/internal/mock/s3/server.go @@ -0,0 +1,157 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package s3 + +import ( + "crypto/md5" + "crypto/tls" + "fmt" + "net/http" + "net/http/httptest" + "path" + "path/filepath" + "strings" + "time" +) + +// Object is a mock Server object. +type Object struct { + Key string + LastModified time.Time + ContentType string + Content []byte +} + +// Server is a simple AWS S3 mock server. +// It serves the provided Objects for the BucketName on the HTTPAddress when +// Start or StartTLS is called. +type Server struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*Object +} + +func NewServer(bucketName string) *Server { + s := &Server{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle("/", http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *Server) Start() { + s.srv.Start() +} + +func (s *Server) StartTLS(config *tls.Config) { + s.srv.TLS = config + s.srv.StartTLS() +} + +func (s *Server) Stop() { + s.srv.Close() +} + +func (s *Server) HTTPAddress() string { + return s.srv.URL +} + +func (s *Server) handler(w http.ResponseWriter, r *http.Request) { + key := path.Base(r.URL.Path) + + switch key { + case s.BucketName: + w.Header().Add("Content-Type", "application/xml") + + if r.Method == http.MethodHead { + w.WriteHeader(200) + return + } + + if r.URL.Query().Has("location") { + w.WriteHeader(200) + w.Write([]byte(` + +Europe + `)) + return + } + + contents := "" + for _, o := range s.Objects { + etag := md5.Sum(o.Content) + contents += fmt.Sprintf(` + + %s + %s + %d + "%x" + STANDARD + `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) + } + + fmt.Fprintf(w, ` + + + %s + + + %d + 1000 + false + %s + + `, s.BucketName, len(s.Objects), contents) + default: + key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) + if err != nil { + w.WriteHeader(500) + return + } + + var found *Object + for _, o := range s.Objects { + if key == o.Key { + found = o + } + } + if found == nil { + w.WriteHeader(404) + return + } + + etag := md5.Sum(found.Content) + lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) + + w.Header().Add("Content-Type", found.ContentType) + w.Header().Add("Last-Modified", lastModified) + w.Header().Add("ETag", fmt.Sprintf("\"%x\"", etag)) + w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) + + if r.Method == http.MethodHead { + w.WriteHeader(200) + return + } + + w.WriteHeader(200) + w.Write(found.Content) + } +} diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index f98e498c..836ba341 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -28,6 +28,8 @@ import ( "github.com/go-logr/logr" "google.golang.org/api/iterator" "google.golang.org/api/option" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -37,12 +39,10 @@ var ( // ErrorDirectoryExists is an error returned when the filename provided // is a directory. ErrorDirectoryExists = errors.New("filename is a directory") - // ErrorObjectDoesNotExist is an error returned when the object whose name - // is provided does not exist. - ErrorObjectDoesNotExist = errors.New("object does not exist") ) -type GCPClient struct { +// GCSClient is a minimal Google Cloud Storage client for fetching objects. +type GCSClient struct { // client for interacting with the Google Cloud // Storage APIs. *gcpstorage.Client @@ -50,27 +50,39 @@ type GCPClient struct { // NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) { - client, err := gcpstorage.NewClient(ctx, opts...) - if err != nil { - return nil, err +func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) { + c := &GCSClient{} + if secret != nil { + client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) + if err != nil { + return nil, err + } + c.Client = client + } else { + client, err := gcpstorage.NewClient(ctx) + if err != nil { + return nil, err + } + c.Client = client } - - return &GCPClient{Client: client}, nil + return c, nil } -// ValidateSecret validates the credential secrets -// It ensures that needed secret fields are not missing. -func ValidateSecret(secret map[string][]byte, name string) error { - if _, exists := secret["serviceaccount"]; !exists { - return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", name) +// ValidateSecret validates the credential secret. The provided Secret may +// be nil. +func ValidateSecret(secret *corev1.Secret) error { + if secret == nil { + return nil + } + if _, exists := secret.Data["serviceaccount"]; !exists { + return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", secret.Name) } - return nil } -// BucketExists checks if the bucket with the provided name exists. -func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { +// BucketExists returns if an object storage bucket with the provided name +// exists, or returns a (client) error. +func (c *GCSClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { _, err := c.Client.Bucket(bucketName).Attrs(ctx) if err == gcpstorage.ErrBucketNotExist { // Not returning error to be compatible with minio's API. @@ -82,34 +94,23 @@ func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, return true, nil } -// ObjectExists checks if the object with the provided name exists. -func (c *GCPClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { - _, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx) - // ErrObjectNotExist is returned if the object does not exist - if err == gcpstorage.ErrObjectNotExist { - return false, err - } - if err != nil { - return false, err - } - return true, nil -} - -// FGetObject gets the object from the bucket and downloads the object locally -func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error { +// FGetObject gets the object from the provided object storage bucket, and +// writes it to targetPath. +// It returns the etag of the successfully fetched file, or any error. +func (c *GCSClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) { // Verify if destination already exists. dirStatus, err := os.Stat(localPath) if err == nil { // If the destination exists and is a directory. if dirStatus.IsDir() { - return ErrorDirectoryExists + return "", ErrorDirectoryExists } } // Proceed if file does not exist. return for all other errors. if err != nil { if !os.IsNotExist(err) { - return err + return "", err } } @@ -118,56 +119,79 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca if objectDir != "" { // Create any missing top level directories. if err := os.MkdirAll(objectDir, 0700); err != nil { - return err + return "", err } } - // ObjectExists verifies if object exists and you have permission to access. - // Check if the object exists and if you have permission to access it. - exists, err := c.ObjectExists(ctx, bucketName, objectName) + // Get Object attributes. + objAttr, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx) if err != nil { - return err - } - if !exists { - return ErrorObjectDoesNotExist + return "", err } + // Prepare target file. objectFile, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY, 0600) if err != nil { - return err + return "", err } - // Get Object from GCP Bucket - objectReader, err := c.Client.Bucket(bucketName).Object(objectName).NewReader(ctx) + // Get Object data. + objectReader, err := c.Client.Bucket(bucketName).Object(objectName).If(gcpstorage.Conditions{ + GenerationMatch: objAttr.Generation, + }).NewReader(ctx) if err != nil { - return err + return "", err } - defer objectReader.Close() + defer func() { + if err = objectReader.Close(); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to close object reader") + } + }() // Write Object to file. if _, err := io.Copy(objectFile, objectReader); err != nil { - return err + return "", err } // Close the file. if err := objectFile.Close(); err != nil { - return err + return "", err } + return objAttr.Etag, nil +} + +// VisitObjects iterates over the items in the provided object storage +// bucket, calling visit for every item. +// If the underlying client or the visit callback returns an error, +// it returns early. +func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error { + items := c.Client.Bucket(bucketName).Objects(ctx, nil) + for { + object, err := items.Next() + if err == IteratorDone { + break + } + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) + return err + } + if err = visit(object.Name, object.Etag); err != nil { + return err + } + } return nil } -// ListObjects lists the objects/contents of the bucket whose bucket name is provided. -// the objects are returned as an Objectiterator and .Next() has to be called on them -// to loop through the Objects. -func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator { - items := c.Client.Bucket(bucketName).Objects(ctx, query) - return items -} - -// Close closes the GCP Client and logs any useful errors -func (c *GCPClient) Close(log logr.Logger) { +// Close closes the GCP Client and logs any useful errors. +func (c *GCSClient) Close(ctx context.Context) { + log := logr.FromContextOrDiscard(ctx) if err := c.Client.Close(); err != nil { - log.Error(err, "GCP Provider") + log.Error(err, "closing GCP client") } } + +// ObjectIsNotFound checks if the error provided is storage.ErrObjectNotExist. +func (c *GCSClient) ObjectIsNotFound(err error) bool { + return errors.Is(err, gcpstorage.ErrObjectNotExist) +} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 6c27accf..ded00a32 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package gcp_test +package gcp import ( "context" @@ -32,17 +32,20 @@ import ( "time" gcpstorage "cloud.google.com/go/storage" - "github.com/fluxcd/source-controller/pkg/gcp" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "google.golang.org/api/option" ) const ( - bucketName string = "test-bucket" - objectName string = "test.yaml" + bucketName string = "test-bucket" + objectName string = "test.yaml" + objectGeneration int64 = 3 + objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=" ) var ( @@ -50,12 +53,34 @@ var ( client *gcpstorage.Client close func() err error + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="), + }, + Type: "Opaque", + } + badSecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("test-user"), + }, + Type: "Opaque", + } ) func TestMain(m *testing.M) { hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) { io.Copy(io.Discard, r.Body) - if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName) { + + switch r.RequestURI { + case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName): w.WriteHeader(200) response := getBucket() jsonResponse, err := json.Marshal(response) @@ -66,7 +91,7 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) { + case fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName): w.WriteHeader(200) response := getObject() jsonResponse, err := json.Marshal(response) @@ -77,9 +102,10 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName) { + case fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName): w.WriteHeader(200) - response := getObject() + response := &raw.Objects{} + response.Items = append(response.Items, getObject()) jsonResponse, err := json.Marshal(response) if err != nil { log.Fatalf("error marshalling response %v\n", err) @@ -88,14 +114,16 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/%s/test.yaml", bucketName) || r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) { + case fmt.Sprintf("/%s/test.yaml", bucketName), + fmt.Sprintf("/%s/test.yaml?ifGenerationMatch=%d", bucketName, objectGeneration), + fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName): w.WriteHeader(200) response := getObjectFile() _, err = w.Write([]byte(response)) if err != nil { log.Fatalf("error writing response %v\n", err) } - } else { + default: w.WriteHeader(404) } }) @@ -109,14 +137,15 @@ func TestMain(m *testing.M) { os.Exit(run) } -func TestNewClient(t *testing.T) { - gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc)) - assert.NilError(t, err) - assert.Assert(t, gcpClient != nil) +func TestNewClientWithSecretErr(t *testing.T) { + gcpClient, err := NewClient(context.Background(), secret.DeepCopy()) + t.Log(err) + assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") + assert.Assert(t, gcpClient == nil) } func TestBucketExists(t *testing.T) { - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } exists, err := gcpClient.BucketExists(context.Background(), bucketName) @@ -126,7 +155,7 @@ func TestBucketExists(t *testing.T) { func TestBucketNotExists(t *testing.T) { bucket := "notexistsbucket" - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } exists, err := gcpClient.BucketExists(context.Background(), bucket) @@ -134,55 +163,57 @@ func TestBucketNotExists(t *testing.T) { assert.Assert(t, !exists) } -func TestObjectExists(t *testing.T) { - gcpClient := &gcp.GCPClient{ +func TestVisitObjects(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - exists, err := gcpClient.ObjectExists(context.Background(), bucketName, objectName) - if err == gcpstorage.ErrObjectNotExist { - assert.NilError(t, err) - } + keys := []string{} + etags := []string{} + err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error { + keys = append(keys, key) + etags = append(etags, etag) + return nil + }) assert.NilError(t, err) - assert.Assert(t, exists) + assert.DeepEqual(t, keys, []string{objectName}) + assert.DeepEqual(t, etags, []string{objectEtag}) } -func TestObjectNotExists(t *testing.T) { - object := "doesnotexists.yaml" - gcpClient := &gcp.GCPClient{ +func TestVisitObjectsErr(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - exists, err := gcpClient.ObjectExists(context.Background(), bucketName, object) - assert.Error(t, err, gcpstorage.ErrObjectNotExist.Error()) - assert.Assert(t, !exists) + badBucketName := "bad-bucket" + err := gcpClient.VisitObjects(context.Background(), badBucketName, func(key, etag string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } -func TestListObjects(t *testing.T) { - gcpClient := &gcp.GCPClient{ +func TestVisitObjectsCallbackErr(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil) - for { - _, err := objectIterator.Next() - if err == gcp.IteratorDone { - break - } - assert.NilError(t, err) - } - assert.Assert(t, objectIterator != nil) + mockErr := fmt.Errorf("mock") + err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error { + return mockErr + }) + assert.Error(t, err, mockErr.Error()) } func TestFGetObject(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) assert.NilError(t, err) defer os.RemoveAll(tempDir) - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } localPath := filepath.Join(tempDir, objectName) - err = gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath) + etag, err := gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath) if err != io.EOF { assert.NilError(t, err) } + assert.Equal(t, etag, objectEtag) } func TestFGetObjectNotExists(t *testing.T) { @@ -190,24 +221,25 @@ func TestFGetObjectNotExists(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) assert.NilError(t, err) defer os.RemoveAll(tempDir) - gcpClient := &gcp.GCPClient{ + gcsClient := &GCSClient{ Client: client, } localPath := filepath.Join(tempDir, object) - err = gcpClient.FGetObject(context.Background(), bucketName, object, localPath) + _, err = gcsClient.FGetObject(context.Background(), bucketName, object, localPath) if err != io.EOF { assert.Error(t, err, "storage: object doesn't exist") + assert.Check(t, gcsClient.ObjectIsNotFound(err)) } } func TestFGetObjectDirectoryIsFileName(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) assert.NilError(t, err) - gcpClient := &gcp.GCPClient{ + defer os.RemoveAll(tempDir) + gcpClient := &GCSClient{ Client: client, } - err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir) + _, err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir) if err != io.EOF { assert.Error(t, err, "filename is a directory") } @@ -216,35 +248,27 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { - title string - secret map[string][]byte name string + secret *corev1.Secret error bool }{ { - "Test Case 1", - map[string][]byte{ - "serviceaccount": []byte("serviceaccount"), - }, - "Service Account", - false, + name: "valid secret", + secret: secret.DeepCopy(), }, { - "Test Case 2", - map[string][]byte{ - "data": []byte("data"), - }, - "Service Account", - true, + name: "invalid secret", + secret: badSecret.DeepCopy(), + error: true, }, } for _, testCase := range testCases { - testCase := testCase - t.Run(testCase.title, func(t *testing.T) { + tt := testCase + t.Run(tt.name, func(t *testing.T) { t.Parallel() - err := gcp.ValidateSecret(testCase.secret, testCase.name) - if testCase.error { - assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", testCase.name)) + err := ValidateSecret(tt.secret) + if tt.error { + assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", tt.secret.Name)) } else { assert.NilError(t, err) } @@ -280,7 +304,10 @@ func getObject() *raw.Object { ContentLanguage: "en-us", Size: 1 << 20, CustomTime: customTime.Format(time.RFC3339), - Md5Hash: "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=", + Generation: objectGeneration, + Metageneration: 3, + Etag: objectEtag, + Md5Hash: objectEtag, } } diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go new file mode 100644 index 00000000..f1930dbd --- /dev/null +++ b/pkg/minio/minio.go @@ -0,0 +1,135 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "errors" + "fmt" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/s3utils" + corev1 "k8s.io/api/core/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" +) + +// MinioClient is a minimal Minio client for fetching files from S3 compatible +// storage APIs. +type MinioClient struct { + *minio.Client +} + +// NewClient creates a new Minio storage client. +func NewClient(bucket *sourcev1.Bucket, secret *corev1.Secret) (*MinioClient, error) { + opt := minio.Options{ + Region: bucket.Spec.Region, + Secure: !bucket.Spec.Insecure, + BucketLookup: minio.BucketLookupPath, + } + + if secret != nil { + var accessKey, secretKey string + if k, ok := secret.Data["accesskey"]; ok { + accessKey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretKey = string(k) + } + if accessKey != "" && secretKey != "" { + opt.Creds = credentials.NewStaticV4(accessKey, secretKey, "") + } + } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { + opt.Creds = credentials.NewIAM("") + } + + client, err := minio.New(bucket.Spec.Endpoint, &opt) + if err != nil { + return nil, err + } + return &MinioClient{Client: client}, nil +} + +// ValidateSecret validates the credential secret. The provided Secret may +// be nil. +func ValidateSecret(secret *corev1.Secret) error { + if secret == nil { + return nil + } + err := fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + if _, ok := secret.Data["accesskey"]; !ok { + return err + } + if _, ok := secret.Data["secretkey"]; !ok { + return err + } + return nil +} + +// FGetObject gets the object from the provided object storage bucket, and +// writes it to targetPath. +// It returns the etag of the successfully fetched file, or any error. +func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) { + stat, err := c.Client.StatObject(ctx, bucketName, objectName, minio.GetObjectOptions{}) + if err != nil { + return "", err + } + opts := minio.GetObjectOptions{} + if err = opts.SetMatchETag(stat.ETag); err != nil { + return "", err + } + if err = c.Client.FGetObject(ctx, bucketName, objectName, localPath, opts); err != nil { + return "", err + } + return stat.ETag, nil +} + +// VisitObjects iterates over the items in the provided object storage +// bucket, calling visit for every item. +// If the underlying client or the visit callback returns an error, +// it returns early. +func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error { + for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), + }) { + if object.Err != nil { + err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err) + return err + } + + if err := visit(object.Key, object.ETag); err != nil { + return err + } + } + return nil +} + +// ObjectIsNotFound checks if the error provided is a minio.ErrResponse +// with "NoSuchKey" code. +func (c *MinioClient) ObjectIsNotFound(err error) bool { + if resp := new(minio.ErrorResponse); errors.As(err, resp) { + return resp.Code == "NoSuchKey" + } + return false +} + +// Close closes the Minio Client and logs any useful errors. +func (c *MinioClient) Close(_ context.Context) { + // Minio client does not provide a close method +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go new file mode 100644 index 00000000..d391b127 --- /dev/null +++ b/pkg/minio/minio_test.go @@ -0,0 +1,283 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/pkg/sourceignore" + + "github.com/google/uuid" + miniov7 "github.com/minio/minio-go/v7" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + objectName string = "test.yaml" + objectEtag string = "2020beab5f1711919157756379622d1d" + region string = "us-east-1" +) + +var ( + minioClient *MinioClient + bucketName = "test-bucket-minio" + uuid.New().String() + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("Q3AM3UQ867SPQQA43P2F"), + "secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"), + }, + Type: "Opaque", + } + emptySecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{}, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + bucketAwsProvider = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "aws", + Insecure: true, + }, + } +) + +func TestMain(m *testing.M) { + var err error + ctx := context.Background() + minioClient, err = NewClient(bucket.DeepCopy(), secret.DeepCopy()) + if err != nil { + log.Fatal(err) + } + createBucket(ctx) + addObjectToBucket(ctx) + run := m.Run() + removeObjectFromBucket(ctx) + deleteBucket(ctx) + os.Exit(run) +} + +func TestNewClient(t *testing.T) { + minioClient, err := NewClient(bucket.DeepCopy(), secret.DeepCopy()) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientEmptySecret(t *testing.T) { + minioClient, err := NewClient(bucket.DeepCopy(), emptySecret.DeepCopy()) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientAwsProvider(t *testing.T) { + minioClient, err := NewClient(bucketAwsProvider.DeepCopy(), nil) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestBucketExists(t *testing.T) { + ctx := context.Background() + exists, err := minioClient.BucketExists(ctx, bucketName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestBucketNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioClient.BucketExists(ctx, "notexistsbucket") + assert.NilError(t, err) + assert.Assert(t, !exists) +} + +func TestFGetObject(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + _, err = minioClient.FGetObject(ctx, bucketName, objectName, path) + assert.NilError(t, err) +} + +func TestFGetObjectNotExists(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + badKey := "invalid.txt" + path := filepath.Join(tempDir, badKey) + _, err = minioClient.FGetObject(ctx, bucketName, badKey, path) + assert.Error(t, err, "The specified key does not exist.") + assert.Check(t, minioClient.ObjectIsNotFound(err)) +} + +func TestVisitObjects(t *testing.T) { + keys := []string{} + etags := []string{} + err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error { + keys = append(keys, key) + etags = append(etags, etag) + return nil + }) + assert.NilError(t, err) + assert.DeepEqual(t, keys, []string{objectName}) + assert.DeepEqual(t, etags, []string{objectEtag}) +} + +func TestVisitObjectsErr(t *testing.T) { + ctx := context.Background() + badBucketName := "bad-bucket" + err := minioClient.VisitObjects(ctx, badBucketName, func(string, string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) +} + +func TestVisitObjectsCallbackErr(t *testing.T) { + mockErr := fmt.Errorf("mock") + err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error { + return mockErr + }) + assert.Error(t, err, mockErr.Error()) +} + +func TestValidateSecret(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + secret *corev1.Secret + error bool + }{ + { + name: "valid secret", + secret: secret.DeepCopy(), + }, + { + name: "nil secret", + secret: nil, + }, + { + name: "invalid secret", + secret: emptySecret.DeepCopy(), + error: true, + }, + } + for _, testCase := range testCases { + tt := testCase + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := ValidateSecret(tt.secret) + if tt.error { + assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'accesskey' and 'secretkey'", tt.secret.Name)) + } else { + assert.NilError(t, err) + } + }) + } +} + +func createBucket(ctx context.Context) { + if err := minioClient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil { + exists, errBucketExists := minioClient.BucketExists(ctx, bucketName) + if errBucketExists == nil && exists { + deleteBucket(ctx) + } else { + log.Fatalln(err) + } + } +} + +func deleteBucket(ctx context.Context) { + if err := minioClient.Client.RemoveBucket(ctx, bucketName); err != nil { + log.Println(err) + } +} + +func addObjectToBucket(ctx context.Context) { + fileReader := strings.NewReader(getObjectFile()) + fileSize := fileReader.Size() + _, err := minioClient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{ + ContentType: "text/x-yaml", + }) + if err != nil { + log.Println(err) + } +} + +func removeObjectFromBucket(ctx context.Context) { + if err := minioClient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{ + GovernanceBypass: true, + }); err != nil { + log.Println(err) + } +} + +func getObjectFile() string { + return ` + apiVersion: source.toolkit.fluxcd.io/v1beta2 + kind: Bucket + metadata: + name: podinfo + namespace: default + spec: + interval: 5m + provider: aws + bucketName: podinfo + endpoint: s3.amazonaws.com + region: us-east-1 + timeout: 30s + ` +}