diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 324cf46e..6ea57b81 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -25,17 +25,11 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" - gcpstorage "cloud.google.com/go/storage" - "github.com/fluxcd/pkg/runtime/events" - "github.com/fluxcd/source-controller/pkg/gcp" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/s3utils" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" - "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -49,6 +43,7 @@ import ( "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" @@ -56,9 +51,23 @@ import ( serror "github.com/fluxcd/source-controller/internal/error" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" "github.com/fluxcd/source-controller/internal/reconcile/summarize" + "github.com/fluxcd/source-controller/pkg/gcp" + "github.com/fluxcd/source-controller/pkg/minio" "github.com/fluxcd/source-controller/pkg/sourceignore" ) +// maxConcurrentBucketFetches is the upper bound on the goroutines used to +// fetch bucket objects. It's important to have a bound, to avoid +// using arbitrary amounts of memory; the actual number is chosen +// according to the queueing rule of thumb with some conservative +// parameters: +// s > Nr / T +// N (number of requestors, i.e., objects to fetch) = 10000 +// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) +// T (total time available) = 1s +// -> s > 100 +const maxConcurrentBucketFetches = 100 + // bucketReadyConditions contains all the conditions information needed // for Bucket Ready status conditions summary calculation. var bucketReadyConditions = summarize.Conditions{ @@ -103,9 +112,107 @@ type BucketReconcilerOptions struct { MaxConcurrentReconciles int } +// BucketProvider is an interface for fetching objects from a storage provider +// bucket. +type BucketProvider interface { + // BucketExists returns if an object storage bucket with the provided name + // exists, or returns a (client) error. + BucketExists(ctx context.Context, bucketName string) (bool, error) + // FGetObject gets the object from the provided object storage bucket, and + // writes it to targetPath. + // It returns the etag of the successfully fetched file, or any error. + FGetObject(ctx context.Context, bucketName, objectKey, targetPath string) (etag string, err error) + // VisitObjects iterates over the items in the provided object storage + // bucket, calling visit for every item. + // If the underlying client or the visit callback returns an error, + // it returns early. + VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error + // ObjectIsNotFound returns true if the given error indicates an object + // could not be found. + ObjectIsNotFound(error) bool + // Close closes the provider's client, if supported. + Close(context.Context) +} + // bucketReconcilerFunc is the function type for all the bucket reconciler // functions. -type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) +type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) + +// etagIndex is an index of storage object keys and their Etag values. +type etagIndex struct { + sync.RWMutex + index map[string]string +} + +// newEtagIndex returns a new etagIndex with an empty initialized index. +func newEtagIndex() *etagIndex { + return &etagIndex{ + index: make(map[string]string), + } +} + +func (i *etagIndex) Add(key, etag string) { + i.Lock() + defer i.Unlock() + i.index[key] = etag +} + +func (i *etagIndex) Delete(key string) { + i.Lock() + defer i.Unlock() + delete(i.index, key) +} + +func (i *etagIndex) Get(key string) string { + i.RLock() + defer i.RUnlock() + return i.index[key] +} + +func (i *etagIndex) Has(key string) bool { + i.RLock() + defer i.RUnlock() + _, ok := i.index[key] + return ok +} + +func (i *etagIndex) Index() map[string]string { + i.RLock() + defer i.RUnlock() + index := make(map[string]string) + for k, v := range i.index { + index[k] = v + } + return index +} + +func (i *etagIndex) Len() int { + i.RLock() + defer i.RUnlock() + return len(i.index) +} + +// Revision calculates the SHA256 checksum of the index. +// The keys are stable sorted, and the SHA256 sum is then calculated for the +// string representation of the key/value pairs, each pair written on a newline +// with a space between them. The sum result is returned as a string. +func (i *etagIndex) Revision() (string, error) { + i.RLock() + defer i.RUnlock() + keyIndex := make([]string, 0, len(i.index)) + for k := range i.index { + keyIndex = append(keyIndex, k) + } + + sort.Strings(keyIndex) + sum := sha256.New() + for _, k := range keyIndex { + if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil { + return "", err + } + } + return fmt.Sprintf("%x", sum.Sum(nil)), nil +} func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) @@ -201,9 +308,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } - index := make(etagIndex) - var artifact sourcev1.Artifact - // Create temp working dir tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name)) if err != nil { @@ -215,10 +319,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, defer os.RemoveAll(tmpDir) // Run the sub-reconcilers and build the result of reconciliation. - var res sreconcile.Result - var resErr error + var ( + res sreconcile.Result + resErr error + index = newEtagIndex() + ) + for _, rec := range reconcilers { - recResult, err := rec(ctx, obj, index, &artifact, tmpDir) + recResult, err := rec(ctx, obj, index, tmpDir) // Exit immediately on ResultRequeue. if recResult == sreconcile.ResultRequeue { return sreconcile.ResultRequeue, nil @@ -241,8 +349,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, // All artifacts for the resource except for the current one are garbage collected from the storage. // If the artifact in the Status object of the resource disappeared from storage, it is removed from the object. // If the hostname of the URLs on the object do not match the current storage server hostname, they are updated. -func (r *BucketReconciler) reconcileStorage(ctx context.Context, - obj *sourcev1.Bucket, _ etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage _ = r.garbageCollect(ctx, obj) @@ -270,335 +377,84 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, // result. // If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret // fails, it records v1beta1.FetchFailedCondition=True and returns early. -func (r *BucketReconciler) reconcileSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { - var secret *corev1.Secret - if obj.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.Spec.SecretRef.Name, - } - secret = &corev1.Secret{} - if err := r.Get(ctx, secretName, secret); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err), - Reason: sourcev1.AuthenticationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, e.Err.Error()) - // Return error as the world as observed may change - return sreconcile.ResultEmpty, e - } +func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { + secret, err := r.getBucketSecret(ctx, obj) + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + // Return error as the world as observed may change + return sreconcile.ResultEmpty, e } + // Construct provider client + var provider BucketProvider switch obj.Spec.Provider { case sourcev1.GoogleBucketProvider: - return r.reconcileGCPSource(ctx, obj, index, artifact, secret, dir) + if err = gcp.ValidateSecret(secret); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + if provider, err = gcp.NewClient(ctx, secret); err != nil { + e := &serror.Event{Err: err, Reason: "ClientError"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } default: - return r.reconcileMinioSource(ctx, obj, index, artifact, secret, dir) - } -} - -// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the -// declared configuration, and observes its state. -// -// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into -// account. In case of an error during the download process (including transient errors), it records -// v1beta1.FetchFailedCondition=True and returns early. -// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to -// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. -// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. -func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) { - // Build the client with the configuration from the object and secret - s3Client, err := r.buildMinioClient(obj, secret) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to construct S3 client: %w", err), - Reason: sourcev1.BucketOperationFailedReason, + if err = minio.ValidateSecret(secret); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - // Return error as the contents of the secret may change - return sreconcile.ResultEmpty, e - } - - // Confirm bucket exists - ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) - defer cancel() - exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - if !exists { - e := &serror.Event{ - Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Look for file with ignore rules first - path := filepath.Join(dir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { - e := &serror.Event{ - Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + if provider, err = minio.NewClient(obj, secret); err != nil { + e := &serror.Event{Err: err, Reason: "ClientError"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + + // Fetch etag index + if err = fetchEtagIndex(ctx, provider, obj, index, dir); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } - // In-spec patterns take precedence - if obj.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - // Build up an index of object keys and their etags - // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to - // detect both structural and file changes - for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if err = object.Err; err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Ignore directories and the .sourceignore file - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - // Ignore matches - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - index[object.Key] = object.ETag - } - - // Calculate revision checksum from the collected index values + // Calculate revision revision, err := index.Revision() if err != nil { - ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision") return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to calculate revision: %w", err), Reason: meta.FailedReason, } } - if !obj.GetArtifact().HasRevision(revision) { - // Mark observations about the revision on the object - message := fmt.Sprintf("new upstream revision '%s'", revision) - conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) - conditions.MarkReconciling(obj, "NewRevision", message) - - // Download the files in parallel, but with a limited number of workers - group, groupCtx := errgroup.WithContext(ctx) - group.Go(func() error { - const workers = 4 - sem := semaphore.NewWeighted(workers) - for key := range index { - k := key - if err := sem.Acquire(groupCtx, 1); err != nil { - return err - } - group.Go(func() error { - defer sem.Release(1) - localPath := filepath.Join(dir, k) - if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil { - return fmt.Errorf("failed to get '%s' file: %w", k, err) - } - return nil - }) - } - return nil - }) - if err = group.Wait(); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - } - conditions.Delete(obj, sourcev1.FetchFailedCondition) - - // Create potential new artifact - *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) - return sreconcile.ResultSuccess, nil -} - -// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the -// declared configuration, and observes its state. -// -// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into -// account. In case of an error during the download process (including transient errors), it records -// v1beta1.DownloadFailedCondition=True and returns early. -// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to -// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. -// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. -func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) { - gcpClient, err := r.buildGCPClient(ctx, secret) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to construct GCP client: %w", err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - // Return error as the contents of the secret may change - return sreconcile.ResultEmpty, e - } - defer gcpClient.Close(ctrl.LoggerFrom(ctx)) - - // Confirm bucket exists - ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) - defer cancel() - exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - if !exists { - e := &serror.Event{ - Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - - // Look for file with ignore rules first - path := filepath.Join(dir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err != gcpstorage.ErrObjectNotExist { - e := &serror.Event{ - Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - // In-spec patterns take precedence - if obj.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - - // Build up an index of object keys and their etags - // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to - // detect both structural and file changes - objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil) - for { - object, err := objects.Next() + // Mark observations about the revision on the object + defer func() { + // As fetchIndexFiles can make last-minute modifications to the etag + // index, we need to re-calculate the revision at the end + revision, err := index.Revision() if err != nil { - if err == gcp.IteratorDone { - break - } - e := &serror.Event{ - Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) - return sreconcile.ResultEmpty, e + ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index") + return } - if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { - continue + if !obj.GetArtifact().HasRevision(revision) { + message := fmt.Sprintf("new upstream revision '%s'", revision) + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) + conditions.MarkReconciling(obj, "NewRevision", message) } - - if matcher.Match(strings.Split(object.Name, "/"), false) { - continue - } - - index[object.Name] = object.Etag - } - - // Calculate revision checksum from the collected index values - revision, err := index.Revision() - if err != nil { - return sreconcile.ResultEmpty, &serror.Event{ - Err: fmt.Errorf("failed to calculate revision: %w", err), - Reason: meta.FailedReason, - } - } + }() if !obj.GetArtifact().HasRevision(revision) { - // Mark observations about the revision on the object - message := fmt.Sprintf("new upstream revision '%s'", revision) - conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) - conditions.MarkReconciling(obj, "NewRevision", message) - - // Download the files in parallel, but with a limited number of workers - group, groupCtx := errgroup.WithContext(ctx) - group.Go(func() error { - const workers = 4 - sem := semaphore.NewWeighted(workers) - for key := range index { - k := key - if err := sem.Acquire(groupCtx, 1); err != nil { - return err - } - group.Go(func() error { - defer sem.Release(1) - localPath := filepath.Join(dir, k) - if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil { - return fmt.Errorf("failed to get '%s' file: %w", k, err) - } - return nil - }) - } - return nil - }) - if err = group.Wait(); err != nil { - e := &serror.Event{ - Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err), - Reason: sourcev1.BucketOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error()) + if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) return sreconcile.ResultEmpty, e } } - conditions.Delete(obj, sourcev1.FetchFailedCondition) - // Create potential new artifact - *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + conditions.Delete(obj, sourcev1.FetchFailedCondition) return sreconcile.ResultSuccess, nil } @@ -609,8 +465,19 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, // If the given artifact does not differ from the object's current, it returns early. // On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is // updated to its path. -func (r *BucketReconciler) reconcileArtifact(ctx context.Context, - obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { + // Calculate revision + revision, err := index.Revision() + if err != nil { + return sreconcile.ResultEmpty, &serror.Event{ + Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err), + Reason: meta.FailedReason, + } + } + + // Create artifact + artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + // Always restore the Ready condition in case it got removed due to a transient error defer func() { if obj.GetArtifact().HasRevision(artifact.Revision) { @@ -640,13 +507,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, } // Ensure artifact directory exists and acquire lock - if err := r.Storage.MkdirAll(*artifact); err != nil { + if err := r.Storage.MkdirAll(artifact); err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to create artifact directory: %w", err), Reason: sourcev1.StorageOperationFailedReason, } } - unlock, err := r.Storage.Lock(*artifact) + unlock, err := r.Storage.Lock(artifact) if err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("failed to acquire lock for artifact: %w", err), @@ -656,7 +523,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, defer unlock() // Archive directory to storage - if err := r.Storage.Archive(artifact, dir, nil); err != nil { + if err := r.Storage.Archive(&artifact, dir, nil); err != nil { return sreconcile.ResultEmpty, &serror.Event{ Err: fmt.Errorf("unable to archive artifact to storage: %s", err), Reason: sourcev1.StorageOperationFailedReason, @@ -665,13 +532,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, r.annotatedEventLogf(ctx, obj, map[string]string{ "revision": artifact.Revision, "checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", len(index), obj.Spec.BucketName) + }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() // Update symlink on a "best effort" basis - url, err := r.Storage.Symlink(*artifact, "latest.tar.gz") + url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, "failed to update status URL symlink: %s", err) @@ -729,74 +596,21 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc return nil } -// buildMinioClient constructs a minio.Client with the data from the given object and Secret. -// It returns an error if the Secret does not have the required fields, or if there is no credential handler -// configured. -func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) { - opts := minio.Options{ - Region: obj.Spec.Region, - Secure: !obj.Spec.Insecure, +// getBucketSecret attempts to fetch the Secret reference if specified on the +// obj. It returns any client error. +func (r *BucketReconciler) getBucketSecret(ctx context.Context, obj *sourcev1.Bucket) (*corev1.Secret, error) { + if obj.Spec.SecretRef == nil { + return nil, nil } - if secret != nil { - var accessKey, secretKey string - if k, ok := secret.Data["accesskey"]; ok { - accessKey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretKey = string(k) - } - if accessKey == "" || secretKey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "") - } else if obj.Spec.Provider == sourcev1.AmazonBucketProvider { - opts.Creds = credentials.NewIAM("") + secretName := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.Spec.SecretRef.Name, } - return minio.New(obj.Spec.Endpoint, &opts) -} - -// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret. -// It returns an error if the Secret does not have the required field, or if the client construction fails. -func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) { - var client *gcp.GCPClient - var err error - if secret != nil { - if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { - return nil, err - } - client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - } else { - client, err = gcp.NewClient(ctx) - if err != nil { - return nil, err - } + secret := &corev1.Secret{} + if err := r.Get(ctx, secretName, secret); err != nil { + return nil, fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err) } - return client, nil -} - -// etagIndex is an index of bucket keys and their Etag values. -type etagIndex map[string]string - -// Revision calculates the SHA256 checksum of the index. -// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of -// the key/value pairs, each pair written on a newline -// The sum result is returned as a string. -func (i etagIndex) Revision() (string, error) { - keyIndex := make([]string, 0, len(i)) - for k := range i { - keyIndex = append(keyIndex, k) - } - sort.Strings(keyIndex) - sum := sha256.New() - for _, k := range keyIndex { - if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil { - return "", err - } - } - return fmt.Sprintf("%x", sum.Sum(nil)), nil + return secret, nil } // eventLogf records event and logs at the same time. @@ -819,3 +633,106 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context, } r.AnnotatedEventf(obj, annotations, eventType, reason, msg) } + +// fetchEtagIndex fetches the current etagIndex for the in the obj specified +// bucket using the given provider, while filtering them using .sourceignore +// rules. After fetching an object, the etag value in the index is updated to +// the current value to ensure accuracy. +func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + + // Confirm bucket exists + exists, err := provider.BucketExists(ctxTimeout, obj.Spec.BucketName) + if err != nil { + return fmt.Errorf("failed to confirm existence of '%s' bucket: %w", obj.Spec.BucketName, err) + } + if !exists { + err = fmt.Errorf("bucket '%s' not found", obj.Spec.BucketName) + return err + } + + // Look for file with ignore rules first + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + if _, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { + if !provider.ObjectIsNotFound(err) { + return err + } + } + ps, err := sourceignore.ReadIgnoreFile(path, nil) + if err != nil { + return err + } + // In-spec patterns take precedence + if obj.Spec.Ignore != nil { + ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) + } + matcher := sourceignore.NewMatcher(ps) + + // Build up index + err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, func(key, etag string) error { + if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile { + return nil + } + + if matcher.Match(strings.Split(key, "/"), false) { + return nil + } + + index.Add(key, etag) + return nil + }) + if err != nil { + return fmt.Errorf("indexation of objects from bucket '%s' failed: %w", obj.Spec.BucketName, err) + } + return nil +} + +// fetchIndexFiles fetches the object files for the keys from the given etagIndex +// using the given provider, and stores them into tempDir. It downloads in +// parallel, but limited to the maxConcurrentBucketFetches. +// Given an index is provided, the bucket is assumed to exist. +func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error { + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + + // Download in parallel, but bound the concurrency. According to + // AWS and GCP docs, rate limits are either soft or don't exist: + // - https://cloud.google.com/storage/quotas + // - https://docs.aws.amazon.com/general/latest/gr/s3.html + // .. so, the limiting factor is this process keeping a small footprint. + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + sem := semaphore.NewWeighted(maxConcurrentBucketFetches) + for key, etag := range index.Index() { + k := key + t := etag + if err := sem.Acquire(groupCtx, 1); err != nil { + return err + } + group.Go(func() error { + defer sem.Release(1) + localPath := filepath.Join(tempDir, k) + etag, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath) + if err != nil { + if provider.ObjectIsNotFound(err) { + ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("indexed object '%s' disappeared from '%s' bucket", k, obj.Spec.BucketName)) + index.Delete(k) + return nil + } + return fmt.Errorf("failed to get '%s' object: %w", k, err) + } + if t != etag { + index.Add(k, etag) + } + return nil + }) + } + return nil + }) + if err := group.Wait(); err != nil { + return fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err) + } + + return nil +} diff --git a/controllers/bucket_controller_fetch_test.go b/controllers/bucket_controller_fetch_test.go new file mode 100644 index 00000000..acaa7e74 --- /dev/null +++ b/controllers/bucket_controller_fetch_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "gotest.tools/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" +) + +type mockBucketObject struct { + etag string + data string +} + +type mockBucketClient struct { + bucketName string + objects map[string]mockBucketObject +} + +var mockNotFound = fmt.Errorf("not found") + +func (m mockBucketClient) BucketExists(_ context.Context, name string) (bool, error) { + return name == m.bucketName, nil +} + +func (m mockBucketClient) FGetObject(_ context.Context, bucket, obj, path string) (string, error) { + if bucket != m.bucketName { + return "", fmt.Errorf("bucket does not exist") + } + // tiny bit of protocol, for convenience: if asked for an object "error", then return an error. + if obj == "error" { + return "", fmt.Errorf("I was asked to report an error") + } + object, ok := m.objects[obj] + if !ok { + return "", mockNotFound + } + if err := os.WriteFile(path, []byte(object.data), os.FileMode(0660)); err != nil { + return "", err + } + return object.etag, nil +} + +func (m mockBucketClient) ObjectIsNotFound(e error) bool { + return e == mockNotFound +} + +func (m mockBucketClient) VisitObjects(_ context.Context, _ string, f func(key, etag string) error) error { + for key, obj := range m.objects { + if err := f(key, obj.etag); err != nil { + return err + } + } + return nil +} + +func (m mockBucketClient) Close(_ context.Context) { + return +} + +func (m *mockBucketClient) addObject(key string, object mockBucketObject) { + if m.objects == nil { + m.objects = make(map[string]mockBucketObject) + } + m.objects[key] = object +} + +func (m *mockBucketClient) objectsToEtagIndex() *etagIndex { + i := newEtagIndex() + for k, v := range m.objects { + i.Add(k, v.etag) + } + return i +} + +func Test_fetchEtagIndex(t *testing.T) { + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + + t.Run("fetches etag index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) + client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, index.Len(), 3) + }) + + t.Run("an error while bucket does not exist", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: "other-bucket-name"} + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + assert.ErrorContains(t, err, "not found") + }) + + t.Run("filters with .sourceignore rules", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`}) + client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"}) + client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"}) + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil { + t.Error(err) + } + + if ok := index.Has("foo.txt"); ok { + t.Error(fmt.Errorf("expected 'foo.txt' index item to not exist")) + } + assert.Equal(t, index.Len(), 1) + }) + + t.Run("filters with ignore rules from object", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`}) + client.addObject("foo.txt", mockBucketObject{etag: "etag1", data: "foo.txt"}) + + ignore := "!*.txt" + bucket := bucket.DeepCopy() + bucket.Spec.Ignore = &ignore + + index := newEtagIndex() + err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil { + t.Error(err) + } + + assert.Equal(t, index.Len(), 1) + if ok := index.Has("foo.txt"); !ok { + t.Error(fmt.Errorf("expected 'foo.txt' index item to exist")) + } + }) +} + +func Test_fetchFiles(t *testing.T) { + bucketName := "all-my-config" + + bucket := sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, + }, + } + + t.Run("fetches files", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"}) + client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"}) + + index := client.objectsToEtagIndex() + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + + for path := range index.Index() { + p := filepath.Join(tmp, path) + _, err := os.Stat(p) + if err != nil { + t.Error(err) + } + } + }) + + t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}} + client.objects["error"] = mockBucketObject{} + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp) + if err == nil { + t.Fatal("expected error but got nil") + } + }) + + t.Run("a changed etag updates the index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"}) + + index := newEtagIndex() + index.Add("foo.yaml", "etag1") + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + f := index.Get("foo.yaml") + assert.Equal(t, f, "etag2") + }) + + t.Run("a disappeared index entry is removed from the index", func(t *testing.T) { + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"}) + + index := newEtagIndex() + index.Add("foo.yaml", "etag1") + // Does not exist on server + index.Add("bar.yaml", "etag2") + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + f := index.Get("foo.yaml") + assert.Equal(t, f, "etag1") + assert.Check(t, !index.Has("bar.yaml")) + }) + + t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) { + // this will fail if, for example, the semaphore is not used correctly and blocks + tmp, err := os.MkdirTemp("", "test-bucket") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + client := mockBucketClient{bucketName: bucketName} + for i := 0; i < 2*maxConcurrentBucketFetches; i++ { + f := fmt.Sprintf("file-%d", i) + client.addObject(f, mockBucketObject{etag: f, data: f}) + } + index := client.objectsToEtagIndex() + + err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp) + if err != nil { + t.Fatal(err) + } + }) +} diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 3ff729f3..060b6e12 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -18,14 +18,10 @@ package controllers import ( "context" - "crypto/md5" - "encoding/json" "fmt" "net/http" - "net/http/httptest" "net/url" "os" - "path" "path/filepath" "strings" "testing" @@ -36,7 +32,6 @@ import ( "github.com/fluxcd/pkg/runtime/conditions" "github.com/fluxcd/pkg/runtime/patch" . "github.com/onsi/gomega" - raw "google.golang.org/api/storage/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,17 +41,19 @@ import ( fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + gcsmock "github.com/fluxcd/source-controller/internal/mock/gcs" + s3mock "github.com/fluxcd/source-controller/internal/mock/s3" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" ) // Environment variable to set the GCP Storage host for the GCP client. -const ENV_GCP_STORAGE_HOST = "STORAGE_EMULATOR_HOST" +const EnvGcpStorageHost = "STORAGE_EMULATOR_HOST" func TestBucketReconciler_Reconcile(t *testing.T) { g := NewWithT(t) - s3Server := newS3Server("test-bucket") - s3Server.Objects = []*s3MockObject{ + s3Server := s3mock.NewServer("test-bucket") + s3Server.Objects = []*s3mock.Object{ { Key: "test.yaml", Content: []byte("test"), @@ -274,10 +271,9 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) } - index := make(etagIndex) - var artifact sourcev1.Artifact + index := newEtagIndex() - got, err := r.reconcileStorage(context.TODO(), obj, index, &artifact, "") + got, err := r.reconcileStorage(context.TODO(), obj, index, "") g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) @@ -299,23 +295,23 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { } } -func TestBucketReconciler_reconcileMinioSource(t *testing.T) { +func TestBucketReconciler_reconcileSource_generic(t *testing.T) { tests := []struct { name string bucketName string - bucketObjects []*s3MockObject + bucketObjects []*s3mock.Object middleware http.Handler secret *corev1.Secret beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertArtifact sourcev1.Artifact + assertIndex *etagIndex assertConditions []metav1.Condition }{ { - name: "reconciles source", + name: "Reconciles GCS source", bucketName: "dummy", - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -324,13 +320,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO(hidde): middleware for mock server @@ -339,20 +336,21 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { // bucketName: "dummy", //}, { - name: "observes non-existing secretRef", + name: "Observes non-existing secretRef", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), }, }, { - name: "observes invalid secretRef", + name: "Observes invalid secretRef", bucketName: "dummy", secret: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -364,38 +362,40 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { Name: "dummy", } }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct S3 client: invalid 'dummy' secret data: required fields"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields 'accesskey' and 'secretkey'"), }, }, { - name: "observes non-existing bucket name", + name: "Observes non-existing bucket name", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), }, }, { - name: "transient bucket name API failure", + name: "Transient bucket name API failure", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" }, - wantErr: true, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), }, }, { - // TODO(hidde): test the lesser happy paths name: ".sourceignore", bucketName: "dummy", - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), @@ -416,23 +416,24 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz", - Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100", + assertIndex: &etagIndex{ + index: map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { name: "spec.ignore overrides .sourceignore", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { - ignore := "included/file.txt" + ignore := "!ignored/file.txt" obj.Spec.Ignore = &ignore }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), @@ -453,24 +454,26 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz", - Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + assertIndex: &etagIndex{ + index: map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { - name: "up-to-date artifact", + name: "Up-to-date artifact", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &sourcev1.Artifact{ - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -479,9 +482,10 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{}, }, @@ -491,7 +495,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file") }, - bucketObjects: []*s3MockObject{ + bucketObjects: []*s3mock.Object{ { Key: "test.txt", Content: []byte("test"), @@ -500,13 +504,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", - Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, } @@ -539,9 +544,9 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { }, } - var server *s3MockServer + var server *s3mock.Server if tt.bucketName != "" { - server = newS3Server(tt.bucketName) + server = s3mock.NewServer(tt.bucketName) server.Objects = tt.bucketObjects server.Start() defer server.Stop() @@ -559,38 +564,39 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) { tt.beforeFunc(obj) } - artifact := &sourcev1.Artifact{} - index := make(etagIndex) - got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir) + index := newEtagIndex() + + got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) - g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) }) } } -func TestBucketReconciler_reconcileGCPSource(t *testing.T) { +func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { tests := []struct { name string bucketName string - bucketObjects []*gcpMockObject + bucketObjects []*gcsmock.Object secret *corev1.Secret beforeFunc func(obj *sourcev1.Bucket) want sreconcile.Result wantErr bool - assertArtifact sourcev1.Artifact + assertIndex *etagIndex assertConditions []metav1.Condition }{ { - name: "reconciles source", + name: "Reconciles GCS source", bucketName: "dummy", - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", ContentType: "text/plain", Content: []byte("test"), + Generation: 3, }, }, secret: &corev1.Secret{ @@ -609,31 +615,33 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { - name: "observes non-existing secretRef", + name: "Observes non-existing secretRef", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), }, }, { - name: "observes invalid secretRef", + name: "Observes invalid secretRef", bucketName: "dummy", secret: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -645,119 +653,133 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { Name: "dummy", } }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct GCP client: invalid 'dummy' secret data: required fields"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields"), }, }, { - name: "observes non-existing bucket name", + name: "Observes non-existing bucket name", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), }, }, { - name: "transient bucket name API failure", + name: "Transient bucket name API failure", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" }, - want: sreconcile.ResultEmpty, - wantErr: true, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"), + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), }, }, { name: ".sourceignore", bucketName: "dummy", - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 1, }, { Key: "ignored/file.txt", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 4, }, { Key: "included/file.txt", Content: []byte("included/file.txt"), ContentType: "text/plain", + Generation: 3, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4.tar.gz", - Revision: "7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4", + assertIndex: &etagIndex{ + index: map[string]string{ + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { name: "spec.ignore overrides .sourceignore", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { - ignore := "included/file.txt" + ignore := "!ignored/file.txt" obj.Spec.Ignore = &ignore }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: ".sourceignore", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 1, }, { Key: "ignored/file.txt", Content: []byte("ignored/file.txt"), ContentType: "text/plain", + Generation: 2, }, { Key: "included/file.txt", Content: []byte("included/file.txt"), ContentType: "text/plain", + Generation: 4, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz", - Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + assertIndex: &etagIndex{ + index: map[string]string{ + "ignored/file.txt": "f08907038338288420ae7dc2d30c0497", + "included/file.txt": "5a4bc7048b3301f677fe15b8678be2f8", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { - name: "up-to-date artifact", + name: "Up-to-date artifact", bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Status.Artifact = &sourcev1.Artifact{ - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", Content: []byte("test"), ContentType: "text/plain", + Generation: 2, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{}, }, @@ -767,21 +789,23 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file") }, - bucketObjects: []*gcpMockObject{ + bucketObjects: []*gcsmock.Object{ { Key: "test.txt", Content: []byte("test"), ContentType: "text/plain", + Generation: 2, }, }, want: sreconcile.ResultSuccess, - assertArtifact: sourcev1.Artifact{ - Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz", - Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8", + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"), + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO: Middleware for mock server to test authentication using secret. @@ -819,7 +843,7 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } // Set up the mock GCP bucket server. - server := newGCPServer(tt.bucketName) + server := gcsmock.NewServer(tt.bucketName) server.Objects = tt.bucketObjects server.Start() defer server.Stop() @@ -834,31 +858,28 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) { } // Set the GCP storage host to be used by the GCP client. - g.Expect(os.Setenv(ENV_GCP_STORAGE_HOST, obj.Spec.Endpoint)).ToNot(HaveOccurred()) + g.Expect(os.Setenv(EnvGcpStorageHost, obj.Spec.Endpoint)).ToNot(HaveOccurred()) defer func() { - g.Expect(os.Unsetenv(ENV_GCP_STORAGE_HOST)).ToNot(HaveOccurred()) + g.Expect(os.Unsetenv(EnvGcpStorageHost)).ToNot(HaveOccurred()) }() - artifact := &sourcev1.Artifact{} - index := make(etagIndex) - got, err := r.reconcileSource(context.TODO(), obj, index, artifact, tmpDir) + index := newEtagIndex() + + got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) + t.Log(err) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) - g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) }) } } func TestBucketReconciler_reconcileArtifact(t *testing.T) { - // testChecksum is the checksum value of the artifacts created in this - // test. - const testChecksum = "4f4fb700ef54461cfa02571ae0db9a0dc1e0cdb5577484a6d75e68dc38e8acc1" - tests := []struct { name string - beforeFunc func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) + beforeFunc func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) afterFunc func(t *WithT, obj *sourcev1.Bucket, dir string) want sreconcile.Result wantErr bool @@ -866,42 +887,45 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }{ { name: "Archiving artifact to storage makes Ready=True", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { - name: "Up-to-date artifact should not update status", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + name: "Up-to-date artifact should not persist and update status", + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { + revision, _ := index.Revision() obj.Spec.Interval = metav1.Duration{Duration: interval} - obj.Status.Artifact = artifact.DeepCopy() + // Incomplete artifact + obj.Status.Artifact = &sourcev1.Artifact{Revision: revision} }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { + // Still incomplete t.Expect(obj.Status.URL).To(BeEmpty()) }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Removes ArtifactOutdatedCondition after creating a new artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "") }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Creates latest symlink to the created artifact", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { @@ -913,12 +937,12 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"), + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), }, }, { name: "Dir path deleted", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) }, want: sreconcile.ResultEmpty, @@ -926,7 +950,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, { name: "Dir path is not a directory", - beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { // Remove the given directory and create a file for the same // path. t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) @@ -969,23 +993,18 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, } - index := make(etagIndex) - artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz") - artifact.Checksum = testChecksum + index := newEtagIndex() if tt.beforeFunc != nil { - tt.beforeFunc(g, obj, artifact, tmpDir) + tt.beforeFunc(g, obj, index, tmpDir) } - got, err := r.reconcileArtifact(context.TODO(), obj, index, &artifact, tmpDir) + got, err := r.reconcileArtifact(context.TODO(), obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) // On error, artifact is empty. Check artifacts only on successful // reconcile. - if !tt.wantErr { - g.Expect(obj.Status.Artifact).To(MatchArtifact(artifact.DeepCopy())) - } g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) if tt.afterFunc != nil { @@ -998,7 +1017,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { func Test_etagIndex_Revision(t *testing.T) { tests := []struct { name string - list etagIndex + list map[string]string want string wantErr bool }{ @@ -1009,7 +1028,7 @@ func Test_etagIndex_Revision(t *testing.T) { "two": "two", "three": "three", }, - want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", }, { name: "index with items in different order", @@ -1018,7 +1037,7 @@ func Test_etagIndex_Revision(t *testing.T) { "one": "one", "two": "two", }, - want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + want: "c0837b3f32bb67c5275858fdb96595f87801cf3c2f622c049918a051d29b2c7f", }, { name: "empty index", @@ -1033,7 +1052,8 @@ func Test_etagIndex_Revision(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := tt.list.Revision() + index := &etagIndex{index: tt.list} + got, err := index.Revision() if (err != nil) != tt.wantErr { t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr) return @@ -1044,277 +1064,3 @@ func Test_etagIndex_Revision(t *testing.T) { }) } } - -// helpers - -func mockFile(root, path, content string) error { - filePath := filepath.Join(root, path) - if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { - panic(err) - } - if err := os.WriteFile(filePath, []byte(content), 0644); err != nil { - panic(err) - } - return nil -} - -type s3MockObject struct { - Key string - LastModified time.Time - ContentType string - Content []byte -} - -type s3MockServer struct { - srv *httptest.Server - mux *http.ServeMux - - BucketName string - Objects []*s3MockObject -} - -func newS3Server(bucketName string) *s3MockServer { - s := &s3MockServer{BucketName: bucketName} - s.mux = http.NewServeMux() - s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler)) - - s.srv = httptest.NewUnstartedServer(s.mux) - - return s -} - -func (s *s3MockServer) Start() { - s.srv.Start() -} - -func (s *s3MockServer) Stop() { - s.srv.Close() -} - -func (s *s3MockServer) HTTPAddress() string { - return s.srv.URL -} - -func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) { - key := path.Base(r.URL.Path) - - switch key { - case s.BucketName: - w.Header().Add("Content-Type", "application/xml") - - if r.Method == http.MethodHead { - return - } - - q := r.URL.Query() - - if q["location"] != nil { - fmt.Fprint(w, ` - -Europe - `) - return - } - - contents := "" - for _, o := range s.Objects { - etag := md5.Sum(o.Content) - contents += fmt.Sprintf(` - - %s - %s - %d - "%b" - STANDARD - `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) - } - - fmt.Fprintf(w, ` - - - %s - - - %d - 1000 - false - %s - - `, s.BucketName, len(s.Objects), contents) - default: - key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) - if err != nil { - w.WriteHeader(500) - return - } - - var found *s3MockObject - for _, o := range s.Objects { - if key == o.Key { - found = o - } - } - if found == nil { - w.WriteHeader(404) - return - } - - etag := md5.Sum(found.Content) - lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) - - w.Header().Add("Content-Type", found.ContentType) - w.Header().Add("Last-Modified", lastModified) - w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag)) - w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) - - if r.Method == http.MethodHead { - return - } - - w.Write(found.Content) - } -} - -type gcpMockObject struct { - Key string - ContentType string - Content []byte -} - -type gcpMockServer struct { - srv *httptest.Server - mux *http.ServeMux - - BucketName string - Etag string - Objects []*gcpMockObject - Close func() -} - -func newGCPServer(bucketName string) *gcpMockServer { - s := &gcpMockServer{BucketName: bucketName} - s.mux = http.NewServeMux() - s.mux.Handle("/", http.HandlerFunc(s.handler)) - - s.srv = httptest.NewUnstartedServer(s.mux) - - return s -} - -func (gs *gcpMockServer) Start() { - gs.srv.Start() -} - -func (gs *gcpMockServer) Stop() { - gs.srv.Close() -} - -func (gs *gcpMockServer) HTTPAddress() string { - return gs.srv.URL -} - -func (gs *gcpMockServer) GetAllObjects() *raw.Objects { - objs := &raw.Objects{} - for _, o := range gs.Objects { - objs.Items = append(objs.Items, getGCPObject(gs.BucketName, *o)) - } - return objs -} - -func (gs *gcpMockServer) GetObjectFile(key string) ([]byte, error) { - for _, o := range gs.Objects { - if o.Key == key { - return o.Content, nil - } - } - return nil, fmt.Errorf("not found") -} - -func (gs *gcpMockServer) handler(w http.ResponseWriter, r *http.Request) { - if strings.HasPrefix(r.RequestURI, "/b/") { - // Handle the bucket info related queries. - if r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", gs.BucketName) { - // Return info about the bucket. - response := getGCPBucket(gs.BucketName, gs.Etag) - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } else if strings.Contains(r.RequestURI, "/o/") { - // Return info about object in the bucket. - var obj *gcpMockObject - for _, o := range gs.Objects { - // The object key in the URI is escaped. - // e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full - if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", gs.BucketName, url.QueryEscape(o.Key)) { - obj = o - } - } - if obj != nil { - response := getGCPObject(gs.BucketName, *obj) - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } - w.WriteHeader(404) - return - } else if strings.Contains(r.RequestURI, "/o?") { - // Return info about all the objects in the bucket. - response := gs.GetAllObjects() - jsonResponse, err := json.Marshal(response) - if err != nil { - w.WriteHeader(500) - return - } - w.WriteHeader(200) - w.Write(jsonResponse) - return - } - w.WriteHeader(404) - return - } else { - // Handle object file query. - bucketPrefix := fmt.Sprintf("/%s/", gs.BucketName) - if strings.HasPrefix(r.RequestURI, bucketPrefix) { - // The URL path is of the format //included/file.txt. - // Extract the object key by discarding the bucket prefix. - key := strings.TrimPrefix(r.URL.Path, bucketPrefix) - // Handle returning object file in a bucket. - response, err := gs.GetObjectFile(key) - if err != nil { - w.WriteHeader(404) - return - } - w.WriteHeader(200) - w.Write(response) - return - } - w.WriteHeader(404) - return - } -} - -func getGCPObject(bucket string, obj gcpMockObject) *raw.Object { - return &raw.Object{ - Bucket: bucket, - Name: obj.Key, - ContentType: obj.ContentType, - } -} - -func getGCPBucket(name, eTag string) *raw.Bucket { - return &raw.Bucket{ - Name: name, - Location: "loc", - Etag: eTag, - } -} diff --git a/go.mod b/go.mod index 499fc12d..64879a7e 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/go-git/go-billy/v5 v5.3.1 github.com/go-git/go-git/v5 v5.4.2 github.com/go-logr/logr v1.2.2 + github.com/google/uuid v1.3.0 github.com/libgit2/git2go/v33 v33.0.6 github.com/minio/minio-go/v7 v7.0.15 github.com/onsi/gomega v1.17.0 @@ -97,7 +98,6 @@ require ( github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/internal/mock/gcs/server.go b/internal/mock/gcs/server.go new file mode 100644 index 00000000..b8b1cd92 --- /dev/null +++ b/internal/mock/gcs/server.go @@ -0,0 +1,220 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gcs + +import ( + "crypto/md5" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + + raw "google.golang.org/api/storage/v1" +) + +var ( + ObjectNotFound = errors.New("object not found") +) + +// Object is a mock Server object. +type Object struct { + Key string + Generation int64 + MetaGeneration int64 + ContentType string + Content []byte +} + +// Server is a simple Google Cloud Storage mock server. +// It serves the provided Objects for the BucketName on the HTTPAddress when +// Start or StartTLS is called. +// It provides primitive support "Generation Conditions" when Object contents +// are fetched. +// Ref: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions +type Server struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*Object +} + +func NewServer(bucketName string) *Server { + s := &Server{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle("/", http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *Server) Start() { + s.srv.Start() +} + +func (s *Server) StartTLS(config *tls.Config) { + s.srv.TLS = config + s.srv.StartTLS() +} + +func (s *Server) Stop() { + s.srv.Close() +} + +func (s *Server) HTTPAddress() string { + return s.srv.URL +} + +func (s *Server) getAllObjects() *raw.Objects { + objs := &raw.Objects{} + for _, o := range s.Objects { + objs.Items = append(objs.Items, getGCSObject(s.BucketName, *o)) + } + return objs +} + +func (s *Server) getObjectFile(key string, generation int64) ([]byte, error) { + for _, o := range s.Objects { + if o.Key == key { + if generation == 0 || generation == o.Generation { + return o.Content, nil + } + } + } + return nil, ObjectNotFound +} + +func (s *Server) handler(w http.ResponseWriter, r *http.Request) { + switch { + // Handle Bucket metadata related queries + case strings.HasPrefix(r.RequestURI, "/b/"): + switch { + // Return metadata about the Bucket + case r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", s.BucketName): + etag := md5.New() + for _, v := range s.Objects { + etag.Write(v.Content) + } + response := getGCSBucket(s.BucketName, fmt.Sprintf("%x", etag.Sum(nil))) + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + // Return metadata about a Bucket object + case strings.Contains(r.RequestURI, "/o/"): + var obj *Object + for _, o := range s.Objects { + // The object key in the URI is escaped. + // e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full + if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", s.BucketName, url.QueryEscape(o.Key)) { + obj = o + break + } + } + if obj != nil { + response := getGCSObject(s.BucketName, *obj) + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + } + w.WriteHeader(404) + return + // Return metadata about all objects in the Bucket + case strings.Contains(r.RequestURI, "/o?"): + response := s.getAllObjects() + jsonResponse, err := json.Marshal(response) + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Write(jsonResponse) + return + default: + w.WriteHeader(404) + return + } + // Handle object file query + default: + bucketPrefix := fmt.Sprintf("/%s/", s.BucketName) + if strings.HasPrefix(r.RequestURI, bucketPrefix) { + // The URL path is of the format //included/file.txt. + // Extract the object key by discarding the bucket prefix. + key := strings.TrimPrefix(r.URL.Path, bucketPrefix) + + // Support "Generation Conditions" + // https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions + var generation int64 + if matchGeneration := r.URL.Query().Get("ifGenerationMatch"); matchGeneration != "" { + var err error + if generation, err = strconv.ParseInt(matchGeneration, 10, 64); err != nil { + w.WriteHeader(500) + return + } + } + + // Handle returning object file in a bucket. + response, err := s.getObjectFile(key, generation) + if err != nil { + w.WriteHeader(404) + return + } + w.WriteHeader(200) + w.Write(response) + return + } + w.WriteHeader(404) + return + } +} + +func getGCSObject(bucket string, obj Object) *raw.Object { + hash := md5.Sum(obj.Content) + etag := fmt.Sprintf("%x", hash) + return &raw.Object{ + Bucket: bucket, + Name: obj.Key, + ContentType: obj.ContentType, + Generation: obj.Generation, + Metageneration: obj.MetaGeneration, + Md5Hash: etag, + Etag: etag, + } +} + +func getGCSBucket(name, eTag string) *raw.Bucket { + return &raw.Bucket{ + Name: name, + Location: "loc", + Etag: eTag, + } +} diff --git a/internal/mock/s3/server.go b/internal/mock/s3/server.go new file mode 100644 index 00000000..904f1942 --- /dev/null +++ b/internal/mock/s3/server.go @@ -0,0 +1,157 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package s3 + +import ( + "crypto/md5" + "crypto/tls" + "fmt" + "net/http" + "net/http/httptest" + "path" + "path/filepath" + "strings" + "time" +) + +// Object is a mock Server object. +type Object struct { + Key string + LastModified time.Time + ContentType string + Content []byte +} + +// Server is a simple AWS S3 mock server. +// It serves the provided Objects for the BucketName on the HTTPAddress when +// Start or StartTLS is called. +type Server struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*Object +} + +func NewServer(bucketName string) *Server { + s := &Server{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle("/", http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *Server) Start() { + s.srv.Start() +} + +func (s *Server) StartTLS(config *tls.Config) { + s.srv.TLS = config + s.srv.StartTLS() +} + +func (s *Server) Stop() { + s.srv.Close() +} + +func (s *Server) HTTPAddress() string { + return s.srv.URL +} + +func (s *Server) handler(w http.ResponseWriter, r *http.Request) { + key := path.Base(r.URL.Path) + + switch key { + case s.BucketName: + w.Header().Add("Content-Type", "application/xml") + + if r.Method == http.MethodHead { + w.WriteHeader(200) + return + } + + if r.URL.Query().Has("location") { + w.WriteHeader(200) + w.Write([]byte(` + +Europe + `)) + return + } + + contents := "" + for _, o := range s.Objects { + etag := md5.Sum(o.Content) + contents += fmt.Sprintf(` + + %s + %s + %d + "%x" + STANDARD + `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) + } + + fmt.Fprintf(w, ` + + + %s + + + %d + 1000 + false + %s + + `, s.BucketName, len(s.Objects), contents) + default: + key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) + if err != nil { + w.WriteHeader(500) + return + } + + var found *Object + for _, o := range s.Objects { + if key == o.Key { + found = o + } + } + if found == nil { + w.WriteHeader(404) + return + } + + etag := md5.Sum(found.Content) + lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) + + w.Header().Add("Content-Type", found.ContentType) + w.Header().Add("Last-Modified", lastModified) + w.Header().Add("ETag", fmt.Sprintf("\"%x\"", etag)) + w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) + + if r.Method == http.MethodHead { + w.WriteHeader(200) + return + } + + w.WriteHeader(200) + w.Write(found.Content) + } +} diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index f98e498c..836ba341 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -28,6 +28,8 @@ import ( "github.com/go-logr/logr" "google.golang.org/api/iterator" "google.golang.org/api/option" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -37,12 +39,10 @@ var ( // ErrorDirectoryExists is an error returned when the filename provided // is a directory. ErrorDirectoryExists = errors.New("filename is a directory") - // ErrorObjectDoesNotExist is an error returned when the object whose name - // is provided does not exist. - ErrorObjectDoesNotExist = errors.New("object does not exist") ) -type GCPClient struct { +// GCSClient is a minimal Google Cloud Storage client for fetching objects. +type GCSClient struct { // client for interacting with the Google Cloud // Storage APIs. *gcpstorage.Client @@ -50,27 +50,39 @@ type GCPClient struct { // NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) { - client, err := gcpstorage.NewClient(ctx, opts...) - if err != nil { - return nil, err +func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) { + c := &GCSClient{} + if secret != nil { + client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) + if err != nil { + return nil, err + } + c.Client = client + } else { + client, err := gcpstorage.NewClient(ctx) + if err != nil { + return nil, err + } + c.Client = client } - - return &GCPClient{Client: client}, nil + return c, nil } -// ValidateSecret validates the credential secrets -// It ensures that needed secret fields are not missing. -func ValidateSecret(secret map[string][]byte, name string) error { - if _, exists := secret["serviceaccount"]; !exists { - return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", name) +// ValidateSecret validates the credential secret. The provided Secret may +// be nil. +func ValidateSecret(secret *corev1.Secret) error { + if secret == nil { + return nil + } + if _, exists := secret.Data["serviceaccount"]; !exists { + return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", secret.Name) } - return nil } -// BucketExists checks if the bucket with the provided name exists. -func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { +// BucketExists returns if an object storage bucket with the provided name +// exists, or returns a (client) error. +func (c *GCSClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { _, err := c.Client.Bucket(bucketName).Attrs(ctx) if err == gcpstorage.ErrBucketNotExist { // Not returning error to be compatible with minio's API. @@ -82,34 +94,23 @@ func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, return true, nil } -// ObjectExists checks if the object with the provided name exists. -func (c *GCPClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { - _, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx) - // ErrObjectNotExist is returned if the object does not exist - if err == gcpstorage.ErrObjectNotExist { - return false, err - } - if err != nil { - return false, err - } - return true, nil -} - -// FGetObject gets the object from the bucket and downloads the object locally -func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error { +// FGetObject gets the object from the provided object storage bucket, and +// writes it to targetPath. +// It returns the etag of the successfully fetched file, or any error. +func (c *GCSClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) { // Verify if destination already exists. dirStatus, err := os.Stat(localPath) if err == nil { // If the destination exists and is a directory. if dirStatus.IsDir() { - return ErrorDirectoryExists + return "", ErrorDirectoryExists } } // Proceed if file does not exist. return for all other errors. if err != nil { if !os.IsNotExist(err) { - return err + return "", err } } @@ -118,56 +119,79 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca if objectDir != "" { // Create any missing top level directories. if err := os.MkdirAll(objectDir, 0700); err != nil { - return err + return "", err } } - // ObjectExists verifies if object exists and you have permission to access. - // Check if the object exists and if you have permission to access it. - exists, err := c.ObjectExists(ctx, bucketName, objectName) + // Get Object attributes. + objAttr, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx) if err != nil { - return err - } - if !exists { - return ErrorObjectDoesNotExist + return "", err } + // Prepare target file. objectFile, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY, 0600) if err != nil { - return err + return "", err } - // Get Object from GCP Bucket - objectReader, err := c.Client.Bucket(bucketName).Object(objectName).NewReader(ctx) + // Get Object data. + objectReader, err := c.Client.Bucket(bucketName).Object(objectName).If(gcpstorage.Conditions{ + GenerationMatch: objAttr.Generation, + }).NewReader(ctx) if err != nil { - return err + return "", err } - defer objectReader.Close() + defer func() { + if err = objectReader.Close(); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to close object reader") + } + }() // Write Object to file. if _, err := io.Copy(objectFile, objectReader); err != nil { - return err + return "", err } // Close the file. if err := objectFile.Close(); err != nil { - return err + return "", err } + return objAttr.Etag, nil +} + +// VisitObjects iterates over the items in the provided object storage +// bucket, calling visit for every item. +// If the underlying client or the visit callback returns an error, +// it returns early. +func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error { + items := c.Client.Bucket(bucketName).Objects(ctx, nil) + for { + object, err := items.Next() + if err == IteratorDone { + break + } + if err != nil { + err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) + return err + } + if err = visit(object.Name, object.Etag); err != nil { + return err + } + } return nil } -// ListObjects lists the objects/contents of the bucket whose bucket name is provided. -// the objects are returned as an Objectiterator and .Next() has to be called on them -// to loop through the Objects. -func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator { - items := c.Client.Bucket(bucketName).Objects(ctx, query) - return items -} - -// Close closes the GCP Client and logs any useful errors -func (c *GCPClient) Close(log logr.Logger) { +// Close closes the GCP Client and logs any useful errors. +func (c *GCSClient) Close(ctx context.Context) { + log := logr.FromContextOrDiscard(ctx) if err := c.Client.Close(); err != nil { - log.Error(err, "GCP Provider") + log.Error(err, "closing GCP client") } } + +// ObjectIsNotFound checks if the error provided is storage.ErrObjectNotExist. +func (c *GCSClient) ObjectIsNotFound(err error) bool { + return errors.Is(err, gcpstorage.ErrObjectNotExist) +} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 6c27accf..ded00a32 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package gcp_test +package gcp import ( "context" @@ -32,17 +32,20 @@ import ( "time" gcpstorage "cloud.google.com/go/storage" - "github.com/fluxcd/source-controller/pkg/gcp" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "google.golang.org/api/option" ) const ( - bucketName string = "test-bucket" - objectName string = "test.yaml" + bucketName string = "test-bucket" + objectName string = "test.yaml" + objectGeneration int64 = 3 + objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=" ) var ( @@ -50,12 +53,34 @@ var ( client *gcpstorage.Client close func() err error + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="), + }, + Type: "Opaque", + } + badSecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "gcp-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("test-user"), + }, + Type: "Opaque", + } ) func TestMain(m *testing.M) { hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) { io.Copy(io.Discard, r.Body) - if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName) { + + switch r.RequestURI { + case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName): w.WriteHeader(200) response := getBucket() jsonResponse, err := json.Marshal(response) @@ -66,7 +91,7 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) { + case fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName): w.WriteHeader(200) response := getObject() jsonResponse, err := json.Marshal(response) @@ -77,9 +102,10 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName) { + case fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName): w.WriteHeader(200) - response := getObject() + response := &raw.Objects{} + response.Items = append(response.Items, getObject()) jsonResponse, err := json.Marshal(response) if err != nil { log.Fatalf("error marshalling response %v\n", err) @@ -88,14 +114,16 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("error writing jsonResponse %v\n", err) } - } else if r.RequestURI == fmt.Sprintf("/%s/test.yaml", bucketName) || r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) { + case fmt.Sprintf("/%s/test.yaml", bucketName), + fmt.Sprintf("/%s/test.yaml?ifGenerationMatch=%d", bucketName, objectGeneration), + fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName): w.WriteHeader(200) response := getObjectFile() _, err = w.Write([]byte(response)) if err != nil { log.Fatalf("error writing response %v\n", err) } - } else { + default: w.WriteHeader(404) } }) @@ -109,14 +137,15 @@ func TestMain(m *testing.M) { os.Exit(run) } -func TestNewClient(t *testing.T) { - gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc)) - assert.NilError(t, err) - assert.Assert(t, gcpClient != nil) +func TestNewClientWithSecretErr(t *testing.T) { + gcpClient, err := NewClient(context.Background(), secret.DeepCopy()) + t.Log(err) + assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") + assert.Assert(t, gcpClient == nil) } func TestBucketExists(t *testing.T) { - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } exists, err := gcpClient.BucketExists(context.Background(), bucketName) @@ -126,7 +155,7 @@ func TestBucketExists(t *testing.T) { func TestBucketNotExists(t *testing.T) { bucket := "notexistsbucket" - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } exists, err := gcpClient.BucketExists(context.Background(), bucket) @@ -134,55 +163,57 @@ func TestBucketNotExists(t *testing.T) { assert.Assert(t, !exists) } -func TestObjectExists(t *testing.T) { - gcpClient := &gcp.GCPClient{ +func TestVisitObjects(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - exists, err := gcpClient.ObjectExists(context.Background(), bucketName, objectName) - if err == gcpstorage.ErrObjectNotExist { - assert.NilError(t, err) - } + keys := []string{} + etags := []string{} + err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error { + keys = append(keys, key) + etags = append(etags, etag) + return nil + }) assert.NilError(t, err) - assert.Assert(t, exists) + assert.DeepEqual(t, keys, []string{objectName}) + assert.DeepEqual(t, etags, []string{objectEtag}) } -func TestObjectNotExists(t *testing.T) { - object := "doesnotexists.yaml" - gcpClient := &gcp.GCPClient{ +func TestVisitObjectsErr(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - exists, err := gcpClient.ObjectExists(context.Background(), bucketName, object) - assert.Error(t, err, gcpstorage.ErrObjectNotExist.Error()) - assert.Assert(t, !exists) + badBucketName := "bad-bucket" + err := gcpClient.VisitObjects(context.Background(), badBucketName, func(key, etag string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } -func TestListObjects(t *testing.T) { - gcpClient := &gcp.GCPClient{ +func TestVisitObjectsCallbackErr(t *testing.T) { + gcpClient := &GCSClient{ Client: client, } - objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil) - for { - _, err := objectIterator.Next() - if err == gcp.IteratorDone { - break - } - assert.NilError(t, err) - } - assert.Assert(t, objectIterator != nil) + mockErr := fmt.Errorf("mock") + err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error { + return mockErr + }) + assert.Error(t, err, mockErr.Error()) } func TestFGetObject(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) assert.NilError(t, err) defer os.RemoveAll(tempDir) - gcpClient := &gcp.GCPClient{ + gcpClient := &GCSClient{ Client: client, } localPath := filepath.Join(tempDir, objectName) - err = gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath) + etag, err := gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath) if err != io.EOF { assert.NilError(t, err) } + assert.Equal(t, etag, objectEtag) } func TestFGetObjectNotExists(t *testing.T) { @@ -190,24 +221,25 @@ func TestFGetObjectNotExists(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) assert.NilError(t, err) defer os.RemoveAll(tempDir) - gcpClient := &gcp.GCPClient{ + gcsClient := &GCSClient{ Client: client, } localPath := filepath.Join(tempDir, object) - err = gcpClient.FGetObject(context.Background(), bucketName, object, localPath) + _, err = gcsClient.FGetObject(context.Background(), bucketName, object, localPath) if err != io.EOF { assert.Error(t, err, "storage: object doesn't exist") + assert.Check(t, gcsClient.ObjectIsNotFound(err)) } } func TestFGetObjectDirectoryIsFileName(t *testing.T) { tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) assert.NilError(t, err) - gcpClient := &gcp.GCPClient{ + defer os.RemoveAll(tempDir) + gcpClient := &GCSClient{ Client: client, } - err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir) + _, err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir) if err != io.EOF { assert.Error(t, err, "filename is a directory") } @@ -216,35 +248,27 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { - title string - secret map[string][]byte name string + secret *corev1.Secret error bool }{ { - "Test Case 1", - map[string][]byte{ - "serviceaccount": []byte("serviceaccount"), - }, - "Service Account", - false, + name: "valid secret", + secret: secret.DeepCopy(), }, { - "Test Case 2", - map[string][]byte{ - "data": []byte("data"), - }, - "Service Account", - true, + name: "invalid secret", + secret: badSecret.DeepCopy(), + error: true, }, } for _, testCase := range testCases { - testCase := testCase - t.Run(testCase.title, func(t *testing.T) { + tt := testCase + t.Run(tt.name, func(t *testing.T) { t.Parallel() - err := gcp.ValidateSecret(testCase.secret, testCase.name) - if testCase.error { - assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", testCase.name)) + err := ValidateSecret(tt.secret) + if tt.error { + assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", tt.secret.Name)) } else { assert.NilError(t, err) } @@ -280,7 +304,10 @@ func getObject() *raw.Object { ContentLanguage: "en-us", Size: 1 << 20, CustomTime: customTime.Format(time.RFC3339), - Md5Hash: "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=", + Generation: objectGeneration, + Metageneration: 3, + Etag: objectEtag, + Md5Hash: objectEtag, } } diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go new file mode 100644 index 00000000..f1930dbd --- /dev/null +++ b/pkg/minio/minio.go @@ -0,0 +1,135 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "errors" + "fmt" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/s3utils" + corev1 "k8s.io/api/core/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" +) + +// MinioClient is a minimal Minio client for fetching files from S3 compatible +// storage APIs. +type MinioClient struct { + *minio.Client +} + +// NewClient creates a new Minio storage client. +func NewClient(bucket *sourcev1.Bucket, secret *corev1.Secret) (*MinioClient, error) { + opt := minio.Options{ + Region: bucket.Spec.Region, + Secure: !bucket.Spec.Insecure, + BucketLookup: minio.BucketLookupPath, + } + + if secret != nil { + var accessKey, secretKey string + if k, ok := secret.Data["accesskey"]; ok { + accessKey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretKey = string(k) + } + if accessKey != "" && secretKey != "" { + opt.Creds = credentials.NewStaticV4(accessKey, secretKey, "") + } + } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { + opt.Creds = credentials.NewIAM("") + } + + client, err := minio.New(bucket.Spec.Endpoint, &opt) + if err != nil { + return nil, err + } + return &MinioClient{Client: client}, nil +} + +// ValidateSecret validates the credential secret. The provided Secret may +// be nil. +func ValidateSecret(secret *corev1.Secret) error { + if secret == nil { + return nil + } + err := fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + if _, ok := secret.Data["accesskey"]; !ok { + return err + } + if _, ok := secret.Data["secretkey"]; !ok { + return err + } + return nil +} + +// FGetObject gets the object from the provided object storage bucket, and +// writes it to targetPath. +// It returns the etag of the successfully fetched file, or any error. +func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) { + stat, err := c.Client.StatObject(ctx, bucketName, objectName, minio.GetObjectOptions{}) + if err != nil { + return "", err + } + opts := minio.GetObjectOptions{} + if err = opts.SetMatchETag(stat.ETag); err != nil { + return "", err + } + if err = c.Client.FGetObject(ctx, bucketName, objectName, localPath, opts); err != nil { + return "", err + } + return stat.ETag, nil +} + +// VisitObjects iterates over the items in the provided object storage +// bucket, calling visit for every item. +// If the underlying client or the visit callback returns an error, +// it returns early. +func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error { + for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), + }) { + if object.Err != nil { + err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err) + return err + } + + if err := visit(object.Key, object.ETag); err != nil { + return err + } + } + return nil +} + +// ObjectIsNotFound checks if the error provided is a minio.ErrResponse +// with "NoSuchKey" code. +func (c *MinioClient) ObjectIsNotFound(err error) bool { + if resp := new(minio.ErrorResponse); errors.As(err, resp) { + return resp.Code == "NoSuchKey" + } + return false +} + +// Close closes the Minio Client and logs any useful errors. +func (c *MinioClient) Close(_ context.Context) { + // Minio client does not provide a close method +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go new file mode 100644 index 00000000..d391b127 --- /dev/null +++ b/pkg/minio/minio_test.go @@ -0,0 +1,283 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package minio + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/pkg/sourceignore" + + "github.com/google/uuid" + miniov7 "github.com/minio/minio-go/v7" + "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + objectName string = "test.yaml" + objectEtag string = "2020beab5f1711919157756379622d1d" + region string = "us-east-1" +) + +var ( + minioClient *MinioClient + bucketName = "test-bucket-minio" + uuid.New().String() + secret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("Q3AM3UQ867SPQQA43P2F"), + "secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"), + }, + Type: "Opaque", + } + emptySecret = corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-secret", + Namespace: "default", + }, + Data: map[string][]byte{}, + Type: "Opaque", + } + bucket = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "generic", + Insecure: true, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + bucketAwsProvider = sourcev1.Bucket{ + ObjectMeta: v1.ObjectMeta{ + Name: "minio-test-bucket", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + BucketName: bucketName, + Endpoint: "play.min.io", + Region: region, + Provider: "aws", + Insecure: true, + }, + } +) + +func TestMain(m *testing.M) { + var err error + ctx := context.Background() + minioClient, err = NewClient(bucket.DeepCopy(), secret.DeepCopy()) + if err != nil { + log.Fatal(err) + } + createBucket(ctx) + addObjectToBucket(ctx) + run := m.Run() + removeObjectFromBucket(ctx) + deleteBucket(ctx) + os.Exit(run) +} + +func TestNewClient(t *testing.T) { + minioClient, err := NewClient(bucket.DeepCopy(), secret.DeepCopy()) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientEmptySecret(t *testing.T) { + minioClient, err := NewClient(bucket.DeepCopy(), emptySecret.DeepCopy()) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestNewClientAwsProvider(t *testing.T) { + minioClient, err := NewClient(bucketAwsProvider.DeepCopy(), nil) + assert.NilError(t, err) + assert.Assert(t, minioClient != nil) +} + +func TestBucketExists(t *testing.T) { + ctx := context.Background() + exists, err := minioClient.BucketExists(ctx, bucketName) + assert.NilError(t, err) + assert.Assert(t, exists) +} + +func TestBucketNotExists(t *testing.T) { + ctx := context.Background() + exists, err := minioClient.BucketExists(ctx, "notexistsbucket") + assert.NilError(t, err) + assert.Assert(t, !exists) +} + +func TestFGetObject(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + path := filepath.Join(tempDir, sourceignore.IgnoreFile) + _, err = minioClient.FGetObject(ctx, bucketName, objectName, path) + assert.NilError(t, err) +} + +func TestFGetObjectNotExists(t *testing.T) { + ctx := context.Background() + tempDir, err := os.MkdirTemp("", bucketName) + assert.NilError(t, err) + defer os.RemoveAll(tempDir) + badKey := "invalid.txt" + path := filepath.Join(tempDir, badKey) + _, err = minioClient.FGetObject(ctx, bucketName, badKey, path) + assert.Error(t, err, "The specified key does not exist.") + assert.Check(t, minioClient.ObjectIsNotFound(err)) +} + +func TestVisitObjects(t *testing.T) { + keys := []string{} + etags := []string{} + err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error { + keys = append(keys, key) + etags = append(etags, etag) + return nil + }) + assert.NilError(t, err) + assert.DeepEqual(t, keys, []string{objectName}) + assert.DeepEqual(t, etags, []string{objectEtag}) +} + +func TestVisitObjectsErr(t *testing.T) { + ctx := context.Background() + badBucketName := "bad-bucket" + err := minioClient.VisitObjects(ctx, badBucketName, func(string, string) error { + return nil + }) + assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) +} + +func TestVisitObjectsCallbackErr(t *testing.T) { + mockErr := fmt.Errorf("mock") + err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error { + return mockErr + }) + assert.Error(t, err, mockErr.Error()) +} + +func TestValidateSecret(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + secret *corev1.Secret + error bool + }{ + { + name: "valid secret", + secret: secret.DeepCopy(), + }, + { + name: "nil secret", + secret: nil, + }, + { + name: "invalid secret", + secret: emptySecret.DeepCopy(), + error: true, + }, + } + for _, testCase := range testCases { + tt := testCase + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := ValidateSecret(tt.secret) + if tt.error { + assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'accesskey' and 'secretkey'", tt.secret.Name)) + } else { + assert.NilError(t, err) + } + }) + } +} + +func createBucket(ctx context.Context) { + if err := minioClient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil { + exists, errBucketExists := minioClient.BucketExists(ctx, bucketName) + if errBucketExists == nil && exists { + deleteBucket(ctx) + } else { + log.Fatalln(err) + } + } +} + +func deleteBucket(ctx context.Context) { + if err := minioClient.Client.RemoveBucket(ctx, bucketName); err != nil { + log.Println(err) + } +} + +func addObjectToBucket(ctx context.Context) { + fileReader := strings.NewReader(getObjectFile()) + fileSize := fileReader.Size() + _, err := minioClient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{ + ContentType: "text/x-yaml", + }) + if err != nil { + log.Println(err) + } +} + +func removeObjectFromBucket(ctx context.Context) { + if err := minioClient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{ + GovernanceBypass: true, + }); err != nil { + log.Println(err) + } +} + +func getObjectFile() string { + return ` + apiVersion: source.toolkit.fluxcd.io/v1beta2 + kind: Bucket + metadata: + name: podinfo + namespace: default + spec: + interval: 5m + provider: aws + bucketName: podinfo + endpoint: s3.amazonaws.com + region: us-east-1 + timeout: 30s + ` +}