Merge pull request #596 from fluxcd/bucket-provider-interface-dev

This commit is contained in:
Hidde Beydals 2022-03-01 10:31:10 +01:00 committed by GitHub
commit a4012f2022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1771 additions and 940 deletions

View File

@ -25,17 +25,11 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
gcpstorage "cloud.google.com/go/storage"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -49,6 +43,7 @@ import (
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
@ -56,9 +51,23 @@ import (
serror "github.com/fluxcd/source-controller/internal/error"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/minio"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
// maxConcurrentBucketFetches is the upper bound on the goroutines used to
// fetch bucket objects. It's important to have a bound, to avoid
// using arbitrary amounts of memory; the actual number is chosen
// according to the queueing rule of thumb with some conservative
// parameters:
// s > Nr / T
// N (number of requestors, i.e., objects to fetch) = 10000
// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s)
// T (total time available) = 1s
// -> s > 100
const maxConcurrentBucketFetches = 100
// bucketReadyConditions contains all the conditions information needed
// for Bucket Ready status conditions summary calculation.
var bucketReadyConditions = summarize.Conditions{
@ -103,9 +112,107 @@ type BucketReconcilerOptions struct {
MaxConcurrentReconciles int
}
// BucketProvider is an interface for fetching objects from a storage provider
// bucket.
type BucketProvider interface {
// BucketExists returns if an object storage bucket with the provided name
// exists, or returns a (client) error.
BucketExists(ctx context.Context, bucketName string) (bool, error)
// FGetObject gets the object from the provided object storage bucket, and
// writes it to targetPath.
// It returns the etag of the successfully fetched file, or any error.
FGetObject(ctx context.Context, bucketName, objectKey, targetPath string) (etag string, err error)
// VisitObjects iterates over the items in the provided object storage
// bucket, calling visit for every item.
// If the underlying client or the visit callback returns an error,
// it returns early.
VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error
// ObjectIsNotFound returns true if the given error indicates an object
// could not be found.
ObjectIsNotFound(error) bool
// Close closes the provider's client, if supported.
Close(context.Context)
}
// bucketReconcilerFunc is the function type for all the bucket reconciler
// functions.
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error)
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)
// etagIndex is an index of storage object keys and their Etag values.
type etagIndex struct {
sync.RWMutex
index map[string]string
}
// newEtagIndex returns a new etagIndex with an empty initialized index.
func newEtagIndex() *etagIndex {
return &etagIndex{
index: make(map[string]string),
}
}
func (i *etagIndex) Add(key, etag string) {
i.Lock()
defer i.Unlock()
i.index[key] = etag
}
func (i *etagIndex) Delete(key string) {
i.Lock()
defer i.Unlock()
delete(i.index, key)
}
func (i *etagIndex) Get(key string) string {
i.RLock()
defer i.RUnlock()
return i.index[key]
}
func (i *etagIndex) Has(key string) bool {
i.RLock()
defer i.RUnlock()
_, ok := i.index[key]
return ok
}
func (i *etagIndex) Index() map[string]string {
i.RLock()
defer i.RUnlock()
index := make(map[string]string)
for k, v := range i.index {
index[k] = v
}
return index
}
func (i *etagIndex) Len() int {
i.RLock()
defer i.RUnlock()
return len(i.index)
}
// Revision calculates the SHA256 checksum of the index.
// The keys are stable sorted, and the SHA256 sum is then calculated for the
// string representation of the key/value pairs, each pair written on a newline
// with a space between them. The sum result is returned as a string.
func (i *etagIndex) Revision() (string, error) {
i.RLock()
defer i.RUnlock()
keyIndex := make([]string, 0, len(i.index))
for k := range i.index {
keyIndex = append(keyIndex, k)
}
sort.Strings(keyIndex)
sum := sha256.New()
for _, k := range keyIndex {
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil {
return "", err
}
}
return fmt.Sprintf("%x", sum.Sum(nil)), nil
}
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
@ -201,9 +308,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
index := make(etagIndex)
var artifact sourcev1.Artifact
// Create temp working dir
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
if err != nil {
@ -215,10 +319,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
defer os.RemoveAll(tmpDir)
// Run the sub-reconcilers and build the result of reconciliation.
var res sreconcile.Result
var resErr error
var (
res sreconcile.Result
resErr error
index = newEtagIndex()
)
for _, rec := range reconcilers {
recResult, err := rec(ctx, obj, index, &artifact, tmpDir)
recResult, err := rec(ctx, obj, index, tmpDir)
// Exit immediately on ResultRequeue.
if recResult == sreconcile.ResultRequeue {
return sreconcile.ResultRequeue, nil
@ -241,8 +349,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// All artifacts for the resource except for the current one are garbage collected from the storage.
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
func (r *BucketReconciler) reconcileStorage(ctx context.Context,
obj *sourcev1.Bucket, _ etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
@ -270,335 +377,84 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context,
// result.
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
func (r *BucketReconciler) reconcileSource(ctx context.Context,
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
var secret *corev1.Secret
if obj.Spec.SecretRef != nil {
secretName := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.Spec.SecretRef.Name,
}
secret = &corev1.Secret{}
if err := r.Get(ctx, secretName, secret); err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err),
Reason: sourcev1.AuthenticationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, e.Err.Error())
// Return error as the world as observed may change
return sreconcile.ResultEmpty, e
}
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
secret, err := r.getBucketSecret(ctx, obj)
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
// Return error as the world as observed may change
return sreconcile.ResultEmpty, e
}
// Construct provider client
var provider BucketProvider
switch obj.Spec.Provider {
case sourcev1.GoogleBucketProvider:
return r.reconcileGCPSource(ctx, obj, index, artifact, secret, dir)
if err = gcp.ValidateSecret(secret); err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
if provider, err = gcp.NewClient(ctx, secret); err != nil {
e := &serror.Event{Err: err, Reason: "ClientError"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
default:
return r.reconcileMinioSource(ctx, obj, index, artifact, secret, dir)
}
}
// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the
// declared configuration, and observes its state.
//
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
// account. In case of an error during the download process (including transient errors), it records
// v1beta1.FetchFailedCondition=True and returns early.
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
func (r *BucketReconciler) reconcileMinioSource(ctx context.Context,
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
// Build the client with the configuration from the object and secret
s3Client, err := r.buildMinioClient(obj, secret)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to construct S3 client: %w", err),
Reason: sourcev1.BucketOperationFailedReason,
if err = minio.ValidateSecret(secret); err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
// Return error as the contents of the secret may change
return sreconcile.ResultEmpty, e
}
// Confirm bucket exists
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
if !exists {
e := &serror.Event{
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Look for file with ignore rules first
path := filepath.Join(dir, sourceignore.IgnoreFile)
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
e := &serror.Event{
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
if provider, err = minio.NewClient(obj, secret); err != nil {
e := &serror.Event{Err: err, Reason: "ClientError"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
// Fetch etag index
if err = fetchEtagIndex(ctx, provider, obj, index, dir); err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
// In-spec patterns take precedence
if obj.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
// Build up an index of object keys and their etags
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
// detect both structural and file changes
for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
}) {
if err = object.Err; err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Ignore directories and the .sourceignore file
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
continue
}
// Ignore matches
if matcher.Match(strings.Split(object.Key, "/"), false) {
continue
}
index[object.Key] = object.ETag
}
// Calculate revision checksum from the collected index values
// Calculate revision
revision, err := index.Revision()
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to calculate revision: %w", err),
Reason: meta.FailedReason,
}
}
if !obj.GetArtifact().HasRevision(revision) {
// Mark observations about the revision on the object
message := fmt.Sprintf("new upstream revision '%s'", revision)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
// Download the files in parallel, but with a limited number of workers
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
const workers = 4
sem := semaphore.NewWeighted(workers)
for key := range index {
k := key
if err := sem.Acquire(groupCtx, 1); err != nil {
return err
}
group.Go(func() error {
defer sem.Release(1)
localPath := filepath.Join(dir, k)
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil {
return fmt.Errorf("failed to get '%s' file: %w", k, err)
}
return nil
})
}
return nil
})
if err = group.Wait(); err != nil {
e := &serror.Event{
Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
}
conditions.Delete(obj, sourcev1.FetchFailedCondition)
// Create potential new artifact
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
return sreconcile.ResultSuccess, nil
}
// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the
// declared configuration, and observes its state.
//
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
// account. In case of an error during the download process (including transient errors), it records
// v1beta1.DownloadFailedCondition=True and returns early.
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
gcpClient, err := r.buildGCPClient(ctx, secret)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to construct GCP client: %w", err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
// Return error as the contents of the secret may change
return sreconcile.ResultEmpty, e
}
defer gcpClient.Close(ctrl.LoggerFrom(ctx))
// Confirm bucket exists
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
if !exists {
e := &serror.Event{
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// Look for file with ignore rules first
path := filepath.Join(dir, sourceignore.IgnoreFile)
if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err != gcpstorage.ErrObjectNotExist {
e := &serror.Event{
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
// In-spec patterns take precedence
if obj.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
// Build up an index of object keys and their etags
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
// detect both structural and file changes
objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil)
for {
object, err := objects.Next()
// Mark observations about the revision on the object
defer func() {
// As fetchIndexFiles can make last-minute modifications to the etag
// index, we need to re-calculate the revision at the end
revision, err := index.Revision()
if err != nil {
if err == gcp.IteratorDone {
break
}
e := &serror.Event{
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index")
return
}
if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
continue
if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new upstream revision '%s'", revision)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
}
if matcher.Match(strings.Split(object.Name, "/"), false) {
continue
}
index[object.Name] = object.Etag
}
// Calculate revision checksum from the collected index values
revision, err := index.Revision()
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to calculate revision: %w", err),
Reason: meta.FailedReason,
}
}
}()
if !obj.GetArtifact().HasRevision(revision) {
// Mark observations about the revision on the object
message := fmt.Sprintf("new upstream revision '%s'", revision)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
// Download the files in parallel, but with a limited number of workers
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
const workers = 4
sem := semaphore.NewWeighted(workers)
for key := range index {
k := key
if err := sem.Acquire(groupCtx, 1); err != nil {
return err
}
group.Go(func() error {
defer sem.Release(1)
localPath := filepath.Join(dir, k)
if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil {
return fmt.Errorf("failed to get '%s' file: %w", k, err)
}
return nil
})
}
return nil
})
if err = group.Wait(); err != nil {
e := &serror.Event{
Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err),
Reason: sourcev1.BucketOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
}
conditions.Delete(obj, sourcev1.FetchFailedCondition)
// Create potential new artifact
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
conditions.Delete(obj, sourcev1.FetchFailedCondition)
return sreconcile.ResultSuccess, nil
}
@ -609,8 +465,19 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
// If the given artifact does not differ from the object's current, it returns early.
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
// updated to its path.
func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
// Calculate revision
revision, err := index.Revision()
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err),
Reason: meta.FailedReason,
}
}
// Create artifact
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
// Always restore the Ready condition in case it got removed due to a transient error
defer func() {
if obj.GetArtifact().HasRevision(artifact.Revision) {
@ -640,13 +507,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
}
// Ensure artifact directory exists and acquire lock
if err := r.Storage.MkdirAll(*artifact); err != nil {
if err := r.Storage.MkdirAll(artifact); err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to create artifact directory: %w", err),
Reason: sourcev1.StorageOperationFailedReason,
}
}
unlock, err := r.Storage.Lock(*artifact)
unlock, err := r.Storage.Lock(artifact)
if err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("failed to acquire lock for artifact: %w", err),
@ -656,7 +523,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
defer unlock()
// Archive directory to storage
if err := r.Storage.Archive(artifact, dir, nil); err != nil {
if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
return sreconcile.ResultEmpty, &serror.Event{
Err: fmt.Errorf("unable to archive artifact to storage: %s", err),
Reason: sourcev1.StorageOperationFailedReason,
@ -665,13 +532,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
r.annotatedEventLogf(ctx, obj, map[string]string{
"revision": artifact.Revision,
"checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", len(index), obj.Spec.BucketName)
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()
// Update symlink on a "best effort" basis
url, err := r.Storage.Symlink(*artifact, "latest.tar.gz")
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
if err != nil {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
"failed to update status URL symlink: %s", err)
@ -729,74 +596,21 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
return nil
}
// buildMinioClient constructs a minio.Client with the data from the given object and Secret.
// It returns an error if the Secret does not have the required fields, or if there is no credential handler
// configured.
func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) {
opts := minio.Options{
Region: obj.Spec.Region,
Secure: !obj.Spec.Insecure,
// getBucketSecret attempts to fetch the Secret reference if specified on the
// obj. It returns any client error.
func (r *BucketReconciler) getBucketSecret(ctx context.Context, obj *sourcev1.Bucket) (*corev1.Secret, error) {
if obj.Spec.SecretRef == nil {
return nil, nil
}
if secret != nil {
var accessKey, secretKey string
if k, ok := secret.Data["accesskey"]; ok {
accessKey = string(k)
}
if k, ok := secret.Data["secretkey"]; ok {
secretKey = string(k)
}
if accessKey == "" || secretKey == "" {
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
}
opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
} else if obj.Spec.Provider == sourcev1.AmazonBucketProvider {
opts.Creds = credentials.NewIAM("")
secretName := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.Spec.SecretRef.Name,
}
return minio.New(obj.Spec.Endpoint, &opts)
}
// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret.
// It returns an error if the Secret does not have the required field, or if the client construction fails.
func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) {
var client *gcp.GCPClient
var err error
if secret != nil {
if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil {
return nil, err
}
client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
if err != nil {
return nil, err
}
} else {
client, err = gcp.NewClient(ctx)
if err != nil {
return nil, err
}
secret := &corev1.Secret{}
if err := r.Get(ctx, secretName, secret); err != nil {
return nil, fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err)
}
return client, nil
}
// etagIndex is an index of bucket keys and their Etag values.
type etagIndex map[string]string
// Revision calculates the SHA256 checksum of the index.
// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of
// the key/value pairs, each pair written on a newline
// The sum result is returned as a string.
func (i etagIndex) Revision() (string, error) {
keyIndex := make([]string, 0, len(i))
for k := range i {
keyIndex = append(keyIndex, k)
}
sort.Strings(keyIndex)
sum := sha256.New()
for _, k := range keyIndex {
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil {
return "", err
}
}
return fmt.Sprintf("%x", sum.Sum(nil)), nil
return secret, nil
}
// eventLogf records event and logs at the same time.
@ -819,3 +633,106 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context,
}
r.AnnotatedEventf(obj, annotations, eventType, reason, msg)
}
// fetchEtagIndex fetches the current etagIndex for the in the obj specified
// bucket using the given provider, while filtering them using .sourceignore
// rules. After fetching an object, the etag value in the index is updated to
// the current value to ensure accuracy.
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
// Confirm bucket exists
exists, err := provider.BucketExists(ctxTimeout, obj.Spec.BucketName)
if err != nil {
return fmt.Errorf("failed to confirm existence of '%s' bucket: %w", obj.Spec.BucketName, err)
}
if !exists {
err = fmt.Errorf("bucket '%s' not found", obj.Spec.BucketName)
return err
}
// Look for file with ignore rules first
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if _, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if !provider.ObjectIsNotFound(err) {
return err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
return err
}
// In-spec patterns take precedence
if obj.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
// Build up index
err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, func(key, etag string) error {
if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile {
return nil
}
if matcher.Match(strings.Split(key, "/"), false) {
return nil
}
index.Add(key, etag)
return nil
})
if err != nil {
return fmt.Errorf("indexation of objects from bucket '%s' failed: %w", obj.Spec.BucketName, err)
}
return nil
}
// fetchIndexFiles fetches the object files for the keys from the given etagIndex
// using the given provider, and stores them into tempDir. It downloads in
// parallel, but limited to the maxConcurrentBucketFetches.
// Given an index is provided, the bucket is assumed to exist.
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
// Download in parallel, but bound the concurrency. According to
// AWS and GCP docs, rate limits are either soft or don't exist:
// - https://cloud.google.com/storage/quotas
// - https://docs.aws.amazon.com/general/latest/gr/s3.html
// .. so, the limiting factor is this process keeping a small footprint.
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
sem := semaphore.NewWeighted(maxConcurrentBucketFetches)
for key, etag := range index.Index() {
k := key
t := etag
if err := sem.Acquire(groupCtx, 1); err != nil {
return err
}
group.Go(func() error {
defer sem.Release(1)
localPath := filepath.Join(tempDir, k)
etag, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath)
if err != nil {
if provider.ObjectIsNotFound(err) {
ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("indexed object '%s' disappeared from '%s' bucket", k, obj.Spec.BucketName))
index.Delete(k)
return nil
}
return fmt.Errorf("failed to get '%s' object: %w", k, err)
}
if t != etag {
index.Add(k, etag)
}
return nil
})
}
return nil
})
if err := group.Wait(); err != nil {
return fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err)
}
return nil
}

View File

@ -0,0 +1,322 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"gotest.tools/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
)
type mockBucketObject struct {
etag string
data string
}
type mockBucketClient struct {
bucketName string
objects map[string]mockBucketObject
}
var mockNotFound = fmt.Errorf("not found")
func (m mockBucketClient) BucketExists(_ context.Context, name string) (bool, error) {
return name == m.bucketName, nil
}
func (m mockBucketClient) FGetObject(_ context.Context, bucket, obj, path string) (string, error) {
if bucket != m.bucketName {
return "", fmt.Errorf("bucket does not exist")
}
// tiny bit of protocol, for convenience: if asked for an object "error", then return an error.
if obj == "error" {
return "", fmt.Errorf("I was asked to report an error")
}
object, ok := m.objects[obj]
if !ok {
return "", mockNotFound
}
if err := os.WriteFile(path, []byte(object.data), os.FileMode(0660)); err != nil {
return "", err
}
return object.etag, nil
}
func (m mockBucketClient) ObjectIsNotFound(e error) bool {
return e == mockNotFound
}
func (m mockBucketClient) VisitObjects(_ context.Context, _ string, f func(key, etag string) error) error {
for key, obj := range m.objects {
if err := f(key, obj.etag); err != nil {
return err
}
}
return nil
}
func (m mockBucketClient) Close(_ context.Context) {
return
}
func (m *mockBucketClient) addObject(key string, object mockBucketObject) {
if m.objects == nil {
m.objects = make(map[string]mockBucketObject)
}
m.objects[key] = object
}
func (m *mockBucketClient) objectsToEtagIndex() *etagIndex {
i := newEtagIndex()
for k, v := range m.objects {
i.Add(k, v.etag)
}
return i
}
func Test_fetchEtagIndex(t *testing.T) {
bucketName := "all-my-config"
bucket := sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
},
}
t.Run("fetches etag index", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
index := newEtagIndex()
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, index.Len(), 3)
})
t.Run("an error while bucket does not exist", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: "other-bucket-name"}
index := newEtagIndex()
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
assert.ErrorContains(t, err, "not found")
})
t.Run("filters with .sourceignore rules", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`})
client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"})
client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"})
index := newEtagIndex()
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil {
t.Error(err)
}
if ok := index.Has("foo.txt"); ok {
t.Error(fmt.Errorf("expected 'foo.txt' index item to not exist"))
}
assert.Equal(t, index.Len(), 1)
})
t.Run("filters with ignore rules from object", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`})
client.addObject("foo.txt", mockBucketObject{etag: "etag1", data: "foo.txt"})
ignore := "!*.txt"
bucket := bucket.DeepCopy()
bucket.Spec.Ignore = &ignore
index := newEtagIndex()
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil {
t.Error(err)
}
assert.Equal(t, index.Len(), 1)
if ok := index.Has("foo.txt"); !ok {
t.Error(fmt.Errorf("expected 'foo.txt' index item to exist"))
}
})
}
func Test_fetchFiles(t *testing.T) {
bucketName := "all-my-config"
bucket := sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
},
}
t.Run("fetches files", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
index := client.objectsToEtagIndex()
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
for path := range index.Index() {
p := filepath.Join(tmp, path)
_, err := os.Stat(p)
if err != nil {
t.Error(err)
}
}
})
t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}}
client.objects["error"] = mockBucketObject{}
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp)
if err == nil {
t.Fatal("expected error but got nil")
}
})
t.Run("a changed etag updates the index", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"})
index := newEtagIndex()
index.Add("foo.yaml", "etag1")
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
f := index.Get("foo.yaml")
assert.Equal(t, f, "etag2")
})
t.Run("a disappeared index entry is removed from the index", func(t *testing.T) {
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
index := newEtagIndex()
index.Add("foo.yaml", "etag1")
// Does not exist on server
index.Add("bar.yaml", "etag2")
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
f := index.Get("foo.yaml")
assert.Equal(t, f, "etag1")
assert.Check(t, !index.Has("bar.yaml"))
})
t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) {
// this will fail if, for example, the semaphore is not used correctly and blocks
tmp, err := os.MkdirTemp("", "test-bucket")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)
client := mockBucketClient{bucketName: bucketName}
for i := 0; i < 2*maxConcurrentBucketFetches; i++ {
f := fmt.Sprintf("file-%d", i)
client.addObject(f, mockBucketObject{etag: f, data: f})
}
index := client.objectsToEtagIndex()
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
if err != nil {
t.Fatal(err)
}
})
}

File diff suppressed because it is too large Load Diff

2
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/go-git/go-billy/v5 v5.3.1
github.com/go-git/go-git/v5 v5.4.2
github.com/go-logr/logr v1.2.2
github.com/google/uuid v1.3.0
github.com/libgit2/git2go/v33 v33.0.6
github.com/minio/minio-go/v7 v7.0.15
github.com/onsi/gomega v1.17.0
@ -97,7 +98,6 @@ require (
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/gorilla/mux v1.8.0 // indirect

220
internal/mock/gcs/server.go Normal file
View File

@ -0,0 +1,220 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcs
import (
"crypto/md5"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"strings"
raw "google.golang.org/api/storage/v1"
)
var (
ObjectNotFound = errors.New("object not found")
)
// Object is a mock Server object.
type Object struct {
Key string
Generation int64
MetaGeneration int64
ContentType string
Content []byte
}
// Server is a simple Google Cloud Storage mock server.
// It serves the provided Objects for the BucketName on the HTTPAddress when
// Start or StartTLS is called.
// It provides primitive support "Generation Conditions" when Object contents
// are fetched.
// Ref: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions
type Server struct {
srv *httptest.Server
mux *http.ServeMux
BucketName string
Objects []*Object
}
func NewServer(bucketName string) *Server {
s := &Server{BucketName: bucketName}
s.mux = http.NewServeMux()
s.mux.Handle("/", http.HandlerFunc(s.handler))
s.srv = httptest.NewUnstartedServer(s.mux)
return s
}
func (s *Server) Start() {
s.srv.Start()
}
func (s *Server) StartTLS(config *tls.Config) {
s.srv.TLS = config
s.srv.StartTLS()
}
func (s *Server) Stop() {
s.srv.Close()
}
func (s *Server) HTTPAddress() string {
return s.srv.URL
}
func (s *Server) getAllObjects() *raw.Objects {
objs := &raw.Objects{}
for _, o := range s.Objects {
objs.Items = append(objs.Items, getGCSObject(s.BucketName, *o))
}
return objs
}
func (s *Server) getObjectFile(key string, generation int64) ([]byte, error) {
for _, o := range s.Objects {
if o.Key == key {
if generation == 0 || generation == o.Generation {
return o.Content, nil
}
}
}
return nil, ObjectNotFound
}
func (s *Server) handler(w http.ResponseWriter, r *http.Request) {
switch {
// Handle Bucket metadata related queries
case strings.HasPrefix(r.RequestURI, "/b/"):
switch {
// Return metadata about the Bucket
case r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", s.BucketName):
etag := md5.New()
for _, v := range s.Objects {
etag.Write(v.Content)
}
response := getGCSBucket(s.BucketName, fmt.Sprintf("%x", etag.Sum(nil)))
jsonResponse, err := json.Marshal(response)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Write(jsonResponse)
return
// Return metadata about a Bucket object
case strings.Contains(r.RequestURI, "/o/"):
var obj *Object
for _, o := range s.Objects {
// The object key in the URI is escaped.
// e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full
if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", s.BucketName, url.QueryEscape(o.Key)) {
obj = o
break
}
}
if obj != nil {
response := getGCSObject(s.BucketName, *obj)
jsonResponse, err := json.Marshal(response)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Write(jsonResponse)
return
}
w.WriteHeader(404)
return
// Return metadata about all objects in the Bucket
case strings.Contains(r.RequestURI, "/o?"):
response := s.getAllObjects()
jsonResponse, err := json.Marshal(response)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Write(jsonResponse)
return
default:
w.WriteHeader(404)
return
}
// Handle object file query
default:
bucketPrefix := fmt.Sprintf("/%s/", s.BucketName)
if strings.HasPrefix(r.RequestURI, bucketPrefix) {
// The URL path is of the format /<bucket>/included/file.txt.
// Extract the object key by discarding the bucket prefix.
key := strings.TrimPrefix(r.URL.Path, bucketPrefix)
// Support "Generation Conditions"
// https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions
var generation int64
if matchGeneration := r.URL.Query().Get("ifGenerationMatch"); matchGeneration != "" {
var err error
if generation, err = strconv.ParseInt(matchGeneration, 10, 64); err != nil {
w.WriteHeader(500)
return
}
}
// Handle returning object file in a bucket.
response, err := s.getObjectFile(key, generation)
if err != nil {
w.WriteHeader(404)
return
}
w.WriteHeader(200)
w.Write(response)
return
}
w.WriteHeader(404)
return
}
}
func getGCSObject(bucket string, obj Object) *raw.Object {
hash := md5.Sum(obj.Content)
etag := fmt.Sprintf("%x", hash)
return &raw.Object{
Bucket: bucket,
Name: obj.Key,
ContentType: obj.ContentType,
Generation: obj.Generation,
Metageneration: obj.MetaGeneration,
Md5Hash: etag,
Etag: etag,
}
}
func getGCSBucket(name, eTag string) *raw.Bucket {
return &raw.Bucket{
Name: name,
Location: "loc",
Etag: eTag,
}
}

157
internal/mock/s3/server.go Normal file
View File

@ -0,0 +1,157 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3
import (
"crypto/md5"
"crypto/tls"
"fmt"
"net/http"
"net/http/httptest"
"path"
"path/filepath"
"strings"
"time"
)
// Object is a mock Server object.
type Object struct {
Key string
LastModified time.Time
ContentType string
Content []byte
}
// Server is a simple AWS S3 mock server.
// It serves the provided Objects for the BucketName on the HTTPAddress when
// Start or StartTLS is called.
type Server struct {
srv *httptest.Server
mux *http.ServeMux
BucketName string
Objects []*Object
}
func NewServer(bucketName string) *Server {
s := &Server{BucketName: bucketName}
s.mux = http.NewServeMux()
s.mux.Handle("/", http.HandlerFunc(s.handler))
s.srv = httptest.NewUnstartedServer(s.mux)
return s
}
func (s *Server) Start() {
s.srv.Start()
}
func (s *Server) StartTLS(config *tls.Config) {
s.srv.TLS = config
s.srv.StartTLS()
}
func (s *Server) Stop() {
s.srv.Close()
}
func (s *Server) HTTPAddress() string {
return s.srv.URL
}
func (s *Server) handler(w http.ResponseWriter, r *http.Request) {
key := path.Base(r.URL.Path)
switch key {
case s.BucketName:
w.Header().Add("Content-Type", "application/xml")
if r.Method == http.MethodHead {
w.WriteHeader(200)
return
}
if r.URL.Query().Has("location") {
w.WriteHeader(200)
w.Write([]byte(`
<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">Europe</LocationConstraint>
`))
return
}
contents := ""
for _, o := range s.Objects {
etag := md5.Sum(o.Content)
contents += fmt.Sprintf(`
<Contents>
<Key>%s</Key>
<LastModified>%s</LastModified>
<Size>%d</Size>
<ETag>&quot;%x&quot;</ETag>
<StorageClass>STANDARD</StorageClass>
</Contents>`, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
}
fmt.Fprintf(w, `
<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>%s</Name>
<Prefix/>
<Marker/>
<KeyCount>%d</KeyCount>
<MaxKeys>1000</MaxKeys>
<IsTruncated>false</IsTruncated>
%s
</ListBucketResult>
`, s.BucketName, len(s.Objects), contents)
default:
key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
if err != nil {
w.WriteHeader(500)
return
}
var found *Object
for _, o := range s.Objects {
if key == o.Key {
found = o
}
}
if found == nil {
w.WriteHeader(404)
return
}
etag := md5.Sum(found.Content)
lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
w.Header().Add("Content-Type", found.ContentType)
w.Header().Add("Last-Modified", lastModified)
w.Header().Add("ETag", fmt.Sprintf("\"%x\"", etag))
w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
if r.Method == http.MethodHead {
w.WriteHeader(200)
return
}
w.WriteHeader(200)
w.Write(found.Content)
}
}

View File

@ -28,6 +28,8 @@ import (
"github.com/go-logr/logr"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)
var (
@ -37,12 +39,10 @@ var (
// ErrorDirectoryExists is an error returned when the filename provided
// is a directory.
ErrorDirectoryExists = errors.New("filename is a directory")
// ErrorObjectDoesNotExist is an error returned when the object whose name
// is provided does not exist.
ErrorObjectDoesNotExist = errors.New("object does not exist")
)
type GCPClient struct {
// GCSClient is a minimal Google Cloud Storage client for fetching objects.
type GCSClient struct {
// client for interacting with the Google Cloud
// Storage APIs.
*gcpstorage.Client
@ -50,27 +50,39 @@ type GCPClient struct {
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
// Credential environment variable or look for the Google Application Credential file.
func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) {
client, err := gcpstorage.NewClient(ctx, opts...)
if err != nil {
return nil, err
func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) {
c := &GCSClient{}
if secret != nil {
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
if err != nil {
return nil, err
}
c.Client = client
} else {
client, err := gcpstorage.NewClient(ctx)
if err != nil {
return nil, err
}
c.Client = client
}
return &GCPClient{Client: client}, nil
return c, nil
}
// ValidateSecret validates the credential secrets
// It ensures that needed secret fields are not missing.
func ValidateSecret(secret map[string][]byte, name string) error {
if _, exists := secret["serviceaccount"]; !exists {
return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", name)
// ValidateSecret validates the credential secret. The provided Secret may
// be nil.
func ValidateSecret(secret *corev1.Secret) error {
if secret == nil {
return nil
}
if _, exists := secret.Data["serviceaccount"]; !exists {
return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", secret.Name)
}
return nil
}
// BucketExists checks if the bucket with the provided name exists.
func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
// BucketExists returns if an object storage bucket with the provided name
// exists, or returns a (client) error.
func (c *GCSClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
_, err := c.Client.Bucket(bucketName).Attrs(ctx)
if err == gcpstorage.ErrBucketNotExist {
// Not returning error to be compatible with minio's API.
@ -82,34 +94,23 @@ func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool,
return true, nil
}
// ObjectExists checks if the object with the provided name exists.
func (c *GCPClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) {
_, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx)
// ErrObjectNotExist is returned if the object does not exist
if err == gcpstorage.ErrObjectNotExist {
return false, err
}
if err != nil {
return false, err
}
return true, nil
}
// FGetObject gets the object from the bucket and downloads the object locally
func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error {
// FGetObject gets the object from the provided object storage bucket, and
// writes it to targetPath.
// It returns the etag of the successfully fetched file, or any error.
func (c *GCSClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) {
// Verify if destination already exists.
dirStatus, err := os.Stat(localPath)
if err == nil {
// If the destination exists and is a directory.
if dirStatus.IsDir() {
return ErrorDirectoryExists
return "", ErrorDirectoryExists
}
}
// Proceed if file does not exist. return for all other errors.
if err != nil {
if !os.IsNotExist(err) {
return err
return "", err
}
}
@ -118,56 +119,79 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca
if objectDir != "" {
// Create any missing top level directories.
if err := os.MkdirAll(objectDir, 0700); err != nil {
return err
return "", err
}
}
// ObjectExists verifies if object exists and you have permission to access.
// Check if the object exists and if you have permission to access it.
exists, err := c.ObjectExists(ctx, bucketName, objectName)
// Get Object attributes.
objAttr, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx)
if err != nil {
return err
}
if !exists {
return ErrorObjectDoesNotExist
return "", err
}
// Prepare target file.
objectFile, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
return "", err
}
// Get Object from GCP Bucket
objectReader, err := c.Client.Bucket(bucketName).Object(objectName).NewReader(ctx)
// Get Object data.
objectReader, err := c.Client.Bucket(bucketName).Object(objectName).If(gcpstorage.Conditions{
GenerationMatch: objAttr.Generation,
}).NewReader(ctx)
if err != nil {
return err
return "", err
}
defer objectReader.Close()
defer func() {
if err = objectReader.Close(); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to close object reader")
}
}()
// Write Object to file.
if _, err := io.Copy(objectFile, objectReader); err != nil {
return err
return "", err
}
// Close the file.
if err := objectFile.Close(); err != nil {
return err
return "", err
}
return objAttr.Etag, nil
}
// VisitObjects iterates over the items in the provided object storage
// bucket, calling visit for every item.
// If the underlying client or the visit callback returns an error,
// it returns early.
func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error {
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
for {
object, err := items.Next()
if err == IteratorDone {
break
}
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err)
return err
}
if err = visit(object.Name, object.Etag); err != nil {
return err
}
}
return nil
}
// ListObjects lists the objects/contents of the bucket whose bucket name is provided.
// the objects are returned as an Objectiterator and .Next() has to be called on them
// to loop through the Objects.
func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator {
items := c.Client.Bucket(bucketName).Objects(ctx, query)
return items
}
// Close closes the GCP Client and logs any useful errors
func (c *GCPClient) Close(log logr.Logger) {
// Close closes the GCP Client and logs any useful errors.
func (c *GCSClient) Close(ctx context.Context) {
log := logr.FromContextOrDiscard(ctx)
if err := c.Client.Close(); err != nil {
log.Error(err, "GCP Provider")
log.Error(err, "closing GCP client")
}
}
// ObjectIsNotFound checks if the error provided is storage.ErrObjectNotExist.
func (c *GCSClient) ObjectIsNotFound(err error) bool {
return errors.Is(err, gcpstorage.ErrObjectNotExist)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package gcp_test
package gcp
import (
"context"
@ -32,17 +32,20 @@ import (
"time"
gcpstorage "cloud.google.com/go/storage"
"github.com/fluxcd/source-controller/pkg/gcp"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"google.golang.org/api/option"
)
const (
bucketName string = "test-bucket"
objectName string = "test.yaml"
bucketName string = "test-bucket"
objectName string = "test.yaml"
objectGeneration int64 = 3
objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk="
)
var (
@ -50,12 +53,34 @@ var (
client *gcpstorage.Client
close func()
err error
secret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-secret",
Namespace: "default",
},
Data: map[string][]byte{
"serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="),
},
Type: "Opaque",
}
badSecret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-secret",
Namespace: "default",
},
Data: map[string][]byte{
"username": []byte("test-user"),
},
Type: "Opaque",
}
)
func TestMain(m *testing.M) {
hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName) {
switch r.RequestURI {
case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName):
w.WriteHeader(200)
response := getBucket()
jsonResponse, err := json.Marshal(response)
@ -66,7 +91,7 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatalf("error writing jsonResponse %v\n", err)
}
} else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) {
case fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName):
w.WriteHeader(200)
response := getObject()
jsonResponse, err := json.Marshal(response)
@ -77,9 +102,10 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatalf("error writing jsonResponse %v\n", err)
}
} else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName) {
case fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName):
w.WriteHeader(200)
response := getObject()
response := &raw.Objects{}
response.Items = append(response.Items, getObject())
jsonResponse, err := json.Marshal(response)
if err != nil {
log.Fatalf("error marshalling response %v\n", err)
@ -88,14 +114,16 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatalf("error writing jsonResponse %v\n", err)
}
} else if r.RequestURI == fmt.Sprintf("/%s/test.yaml", bucketName) || r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) {
case fmt.Sprintf("/%s/test.yaml", bucketName),
fmt.Sprintf("/%s/test.yaml?ifGenerationMatch=%d", bucketName, objectGeneration),
fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName):
w.WriteHeader(200)
response := getObjectFile()
_, err = w.Write([]byte(response))
if err != nil {
log.Fatalf("error writing response %v\n", err)
}
} else {
default:
w.WriteHeader(404)
}
})
@ -109,14 +137,15 @@ func TestMain(m *testing.M) {
os.Exit(run)
}
func TestNewClient(t *testing.T) {
gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc))
assert.NilError(t, err)
assert.Assert(t, gcpClient != nil)
func TestNewClientWithSecretErr(t *testing.T) {
gcpClient, err := NewClient(context.Background(), secret.DeepCopy())
t.Log(err)
assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value")
assert.Assert(t, gcpClient == nil)
}
func TestBucketExists(t *testing.T) {
gcpClient := &gcp.GCPClient{
gcpClient := &GCSClient{
Client: client,
}
exists, err := gcpClient.BucketExists(context.Background(), bucketName)
@ -126,7 +155,7 @@ func TestBucketExists(t *testing.T) {
func TestBucketNotExists(t *testing.T) {
bucket := "notexistsbucket"
gcpClient := &gcp.GCPClient{
gcpClient := &GCSClient{
Client: client,
}
exists, err := gcpClient.BucketExists(context.Background(), bucket)
@ -134,55 +163,57 @@ func TestBucketNotExists(t *testing.T) {
assert.Assert(t, !exists)
}
func TestObjectExists(t *testing.T) {
gcpClient := &gcp.GCPClient{
func TestVisitObjects(t *testing.T) {
gcpClient := &GCSClient{
Client: client,
}
exists, err := gcpClient.ObjectExists(context.Background(), bucketName, objectName)
if err == gcpstorage.ErrObjectNotExist {
assert.NilError(t, err)
}
keys := []string{}
etags := []string{}
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
keys = append(keys, key)
etags = append(etags, etag)
return nil
})
assert.NilError(t, err)
assert.Assert(t, exists)
assert.DeepEqual(t, keys, []string{objectName})
assert.DeepEqual(t, etags, []string{objectEtag})
}
func TestObjectNotExists(t *testing.T) {
object := "doesnotexists.yaml"
gcpClient := &gcp.GCPClient{
func TestVisitObjectsErr(t *testing.T) {
gcpClient := &GCSClient{
Client: client,
}
exists, err := gcpClient.ObjectExists(context.Background(), bucketName, object)
assert.Error(t, err, gcpstorage.ErrObjectNotExist.Error())
assert.Assert(t, !exists)
badBucketName := "bad-bucket"
err := gcpClient.VisitObjects(context.Background(), badBucketName, func(key, etag string) error {
return nil
})
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
}
func TestListObjects(t *testing.T) {
gcpClient := &gcp.GCPClient{
func TestVisitObjectsCallbackErr(t *testing.T) {
gcpClient := &GCSClient{
Client: client,
}
objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil)
for {
_, err := objectIterator.Next()
if err == gcp.IteratorDone {
break
}
assert.NilError(t, err)
}
assert.Assert(t, objectIterator != nil)
mockErr := fmt.Errorf("mock")
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
return mockErr
})
assert.Error(t, err, mockErr.Error())
}
func TestFGetObject(t *testing.T) {
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
gcpClient := &gcp.GCPClient{
gcpClient := &GCSClient{
Client: client,
}
localPath := filepath.Join(tempDir, objectName)
err = gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath)
etag, err := gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath)
if err != io.EOF {
assert.NilError(t, err)
}
assert.Equal(t, etag, objectEtag)
}
func TestFGetObjectNotExists(t *testing.T) {
@ -190,24 +221,25 @@ func TestFGetObjectNotExists(t *testing.T) {
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
gcpClient := &gcp.GCPClient{
gcsClient := &GCSClient{
Client: client,
}
localPath := filepath.Join(tempDir, object)
err = gcpClient.FGetObject(context.Background(), bucketName, object, localPath)
_, err = gcsClient.FGetObject(context.Background(), bucketName, object, localPath)
if err != io.EOF {
assert.Error(t, err, "storage: object doesn't exist")
assert.Check(t, gcsClient.ObjectIsNotFound(err))
}
}
func TestFGetObjectDirectoryIsFileName(t *testing.T) {
tempDir, err := os.MkdirTemp("", bucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
gcpClient := &gcp.GCPClient{
defer os.RemoveAll(tempDir)
gcpClient := &GCSClient{
Client: client,
}
err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir)
_, err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir)
if err != io.EOF {
assert.Error(t, err, "filename is a directory")
}
@ -216,35 +248,27 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) {
func TestValidateSecret(t *testing.T) {
t.Parallel()
testCases := []struct {
title string
secret map[string][]byte
name string
secret *corev1.Secret
error bool
}{
{
"Test Case 1",
map[string][]byte{
"serviceaccount": []byte("serviceaccount"),
},
"Service Account",
false,
name: "valid secret",
secret: secret.DeepCopy(),
},
{
"Test Case 2",
map[string][]byte{
"data": []byte("data"),
},
"Service Account",
true,
name: "invalid secret",
secret: badSecret.DeepCopy(),
error: true,
},
}
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.title, func(t *testing.T) {
tt := testCase
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := gcp.ValidateSecret(testCase.secret, testCase.name)
if testCase.error {
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", testCase.name))
err := ValidateSecret(tt.secret)
if tt.error {
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", tt.secret.Name))
} else {
assert.NilError(t, err)
}
@ -280,7 +304,10 @@ func getObject() *raw.Object {
ContentLanguage: "en-us",
Size: 1 << 20,
CustomTime: customTime.Format(time.RFC3339),
Md5Hash: "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=",
Generation: objectGeneration,
Metageneration: 3,
Etag: objectEtag,
Md5Hash: objectEtag,
}
}

135
pkg/minio/minio.go Normal file
View File

@ -0,0 +1,135 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minio
import (
"context"
"errors"
"fmt"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
corev1 "k8s.io/api/core/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
)
// MinioClient is a minimal Minio client for fetching files from S3 compatible
// storage APIs.
type MinioClient struct {
*minio.Client
}
// NewClient creates a new Minio storage client.
func NewClient(bucket *sourcev1.Bucket, secret *corev1.Secret) (*MinioClient, error) {
opt := minio.Options{
Region: bucket.Spec.Region,
Secure: !bucket.Spec.Insecure,
BucketLookup: minio.BucketLookupPath,
}
if secret != nil {
var accessKey, secretKey string
if k, ok := secret.Data["accesskey"]; ok {
accessKey = string(k)
}
if k, ok := secret.Data["secretkey"]; ok {
secretKey = string(k)
}
if accessKey != "" && secretKey != "" {
opt.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
}
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
opt.Creds = credentials.NewIAM("")
}
client, err := minio.New(bucket.Spec.Endpoint, &opt)
if err != nil {
return nil, err
}
return &MinioClient{Client: client}, nil
}
// ValidateSecret validates the credential secret. The provided Secret may
// be nil.
func ValidateSecret(secret *corev1.Secret) error {
if secret == nil {
return nil
}
err := fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
if _, ok := secret.Data["accesskey"]; !ok {
return err
}
if _, ok := secret.Data["secretkey"]; !ok {
return err
}
return nil
}
// FGetObject gets the object from the provided object storage bucket, and
// writes it to targetPath.
// It returns the etag of the successfully fetched file, or any error.
func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) {
stat, err := c.Client.StatObject(ctx, bucketName, objectName, minio.GetObjectOptions{})
if err != nil {
return "", err
}
opts := minio.GetObjectOptions{}
if err = opts.SetMatchETag(stat.ETag); err != nil {
return "", err
}
if err = c.Client.FGetObject(ctx, bucketName, objectName, localPath, opts); err != nil {
return "", err
}
return stat.ETag, nil
}
// VisitObjects iterates over the items in the provided object storage
// bucket, calling visit for every item.
// If the underlying client or the visit callback returns an error,
// it returns early.
func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error {
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
}) {
if object.Err != nil {
err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err)
return err
}
if err := visit(object.Key, object.ETag); err != nil {
return err
}
}
return nil
}
// ObjectIsNotFound checks if the error provided is a minio.ErrResponse
// with "NoSuchKey" code.
func (c *MinioClient) ObjectIsNotFound(err error) bool {
if resp := new(minio.ErrorResponse); errors.As(err, resp) {
return resp.Code == "NoSuchKey"
}
return false
}
// Close closes the Minio Client and logs any useful errors.
func (c *MinioClient) Close(_ context.Context) {
// Minio client does not provide a close method
}

283
pkg/minio/minio_test.go Normal file
View File

@ -0,0 +1,283 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minio
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"testing"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/google/uuid"
miniov7 "github.com/minio/minio-go/v7"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
objectName string = "test.yaml"
objectEtag string = "2020beab5f1711919157756379622d1d"
region string = "us-east-1"
)
var (
minioClient *MinioClient
bucketName = "test-bucket-minio" + uuid.New().String()
secret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "minio-secret",
Namespace: "default",
},
Data: map[string][]byte{
"accesskey": []byte("Q3AM3UQ867SPQQA43P2F"),
"secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"),
},
Type: "Opaque",
}
emptySecret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "minio-secret",
Namespace: "default",
},
Data: map[string][]byte{},
Type: "Opaque",
}
bucket = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "minio-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "play.min.io",
Region: region,
Provider: "generic",
Insecure: true,
SecretRef: &meta.LocalObjectReference{
Name: secret.Name,
},
},
}
bucketAwsProvider = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "minio-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "play.min.io",
Region: region,
Provider: "aws",
Insecure: true,
},
}
)
func TestMain(m *testing.M) {
var err error
ctx := context.Background()
minioClient, err = NewClient(bucket.DeepCopy(), secret.DeepCopy())
if err != nil {
log.Fatal(err)
}
createBucket(ctx)
addObjectToBucket(ctx)
run := m.Run()
removeObjectFromBucket(ctx)
deleteBucket(ctx)
os.Exit(run)
}
func TestNewClient(t *testing.T) {
minioClient, err := NewClient(bucket.DeepCopy(), secret.DeepCopy())
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestNewClientEmptySecret(t *testing.T) {
minioClient, err := NewClient(bucket.DeepCopy(), emptySecret.DeepCopy())
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestNewClientAwsProvider(t *testing.T) {
minioClient, err := NewClient(bucketAwsProvider.DeepCopy(), nil)
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestBucketExists(t *testing.T) {
ctx := context.Background()
exists, err := minioClient.BucketExists(ctx, bucketName)
assert.NilError(t, err)
assert.Assert(t, exists)
}
func TestBucketNotExists(t *testing.T) {
ctx := context.Background()
exists, err := minioClient.BucketExists(ctx, "notexistsbucket")
assert.NilError(t, err)
assert.Assert(t, !exists)
}
func TestFGetObject(t *testing.T) {
ctx := context.Background()
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
_, err = minioClient.FGetObject(ctx, bucketName, objectName, path)
assert.NilError(t, err)
}
func TestFGetObjectNotExists(t *testing.T) {
ctx := context.Background()
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
badKey := "invalid.txt"
path := filepath.Join(tempDir, badKey)
_, err = minioClient.FGetObject(ctx, bucketName, badKey, path)
assert.Error(t, err, "The specified key does not exist.")
assert.Check(t, minioClient.ObjectIsNotFound(err))
}
func TestVisitObjects(t *testing.T) {
keys := []string{}
etags := []string{}
err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
keys = append(keys, key)
etags = append(etags, etag)
return nil
})
assert.NilError(t, err)
assert.DeepEqual(t, keys, []string{objectName})
assert.DeepEqual(t, etags, []string{objectEtag})
}
func TestVisitObjectsErr(t *testing.T) {
ctx := context.Background()
badBucketName := "bad-bucket"
err := minioClient.VisitObjects(ctx, badBucketName, func(string, string) error {
return nil
})
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
}
func TestVisitObjectsCallbackErr(t *testing.T) {
mockErr := fmt.Errorf("mock")
err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
return mockErr
})
assert.Error(t, err, mockErr.Error())
}
func TestValidateSecret(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
secret *corev1.Secret
error bool
}{
{
name: "valid secret",
secret: secret.DeepCopy(),
},
{
name: "nil secret",
secret: nil,
},
{
name: "invalid secret",
secret: emptySecret.DeepCopy(),
error: true,
},
}
for _, testCase := range testCases {
tt := testCase
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := ValidateSecret(tt.secret)
if tt.error {
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'accesskey' and 'secretkey'", tt.secret.Name))
} else {
assert.NilError(t, err)
}
})
}
}
func createBucket(ctx context.Context) {
if err := minioClient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil {
exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)
if errBucketExists == nil && exists {
deleteBucket(ctx)
} else {
log.Fatalln(err)
}
}
}
func deleteBucket(ctx context.Context) {
if err := minioClient.Client.RemoveBucket(ctx, bucketName); err != nil {
log.Println(err)
}
}
func addObjectToBucket(ctx context.Context) {
fileReader := strings.NewReader(getObjectFile())
fileSize := fileReader.Size()
_, err := minioClient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{
ContentType: "text/x-yaml",
})
if err != nil {
log.Println(err)
}
}
func removeObjectFromBucket(ctx context.Context) {
if err := minioClient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{
GovernanceBypass: true,
}); err != nil {
log.Println(err)
}
}
func getObjectFile() string {
return `
apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: Bucket
metadata:
name: podinfo
namespace: default
spec:
interval: 5m
provider: aws
bucketName: podinfo
endpoint: s3.amazonaws.com
region: us-east-1
timeout: 30s
`
}