Introduce BucketProvider interface
This commit introduces a BucketProvider interface for fetch operations against object storage provider buckets. Allowing for easier introduction of new provider implementations. The algorithm for conditionally downloading object files is the same, whether you are using GCP storage or an S3/Minio-compatible bucket. The only thing that differs is how the respective clients handle enumerating through the objects in the bucket; by implementing just that in each provider, I can have the select-and-fetch code in once place. The client implementations do now include safe-guards to ensure the fetched object is the same as metadata has been collected for. In addition, minor changes have been made to the object fetch operation to take into account that: - Etags can change between composition of index and actual fetch, in which case the etag is now updated. - Objects can disappear between composition of index and actual fetch, in which case the item is removed from the index. Lastly, the requirement for authentication has been removed (and not referring to a Secret at all is thus allowed), to provide support for e.g. public buckets. Co-authored-by: Hidde Beydals <hello@hidde.co> Co-authored by: Michael Bridgen <michael@weave.works> Signed-off-by: pa250194 <pa250194@ncr.com>
This commit is contained in:
parent
bae1b1094a
commit
ed6c6ebc3c
|
|
@ -25,17 +25,11 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
gcpstorage "cloud.google.com/go/storage"
|
|
||||||
"github.com/fluxcd/pkg/runtime/events"
|
|
||||||
"github.com/fluxcd/source-controller/pkg/gcp"
|
|
||||||
"github.com/minio/minio-go/v7"
|
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
||||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
"google.golang.org/api/option"
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
|
@ -49,6 +43,7 @@ import (
|
||||||
"github.com/fluxcd/pkg/apis/meta"
|
"github.com/fluxcd/pkg/apis/meta"
|
||||||
"github.com/fluxcd/pkg/runtime/conditions"
|
"github.com/fluxcd/pkg/runtime/conditions"
|
||||||
helper "github.com/fluxcd/pkg/runtime/controller"
|
helper "github.com/fluxcd/pkg/runtime/controller"
|
||||||
|
"github.com/fluxcd/pkg/runtime/events"
|
||||||
"github.com/fluxcd/pkg/runtime/patch"
|
"github.com/fluxcd/pkg/runtime/patch"
|
||||||
"github.com/fluxcd/pkg/runtime/predicates"
|
"github.com/fluxcd/pkg/runtime/predicates"
|
||||||
|
|
||||||
|
|
@ -56,9 +51,23 @@ import (
|
||||||
serror "github.com/fluxcd/source-controller/internal/error"
|
serror "github.com/fluxcd/source-controller/internal/error"
|
||||||
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
|
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
|
||||||
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
|
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
|
||||||
|
"github.com/fluxcd/source-controller/pkg/gcp"
|
||||||
|
"github.com/fluxcd/source-controller/pkg/minio"
|
||||||
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// maxConcurrentBucketFetches is the upper bound on the goroutines used to
|
||||||
|
// fetch bucket objects. It's important to have a bound, to avoid
|
||||||
|
// using arbitrary amounts of memory; the actual number is chosen
|
||||||
|
// according to the queueing rule of thumb with some conservative
|
||||||
|
// parameters:
|
||||||
|
// s > Nr / T
|
||||||
|
// N (number of requestors, i.e., objects to fetch) = 10000
|
||||||
|
// r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s)
|
||||||
|
// T (total time available) = 1s
|
||||||
|
// -> s > 100
|
||||||
|
const maxConcurrentBucketFetches = 100
|
||||||
|
|
||||||
// bucketReadyConditions contains all the conditions information needed
|
// bucketReadyConditions contains all the conditions information needed
|
||||||
// for Bucket Ready status conditions summary calculation.
|
// for Bucket Ready status conditions summary calculation.
|
||||||
var bucketReadyConditions = summarize.Conditions{
|
var bucketReadyConditions = summarize.Conditions{
|
||||||
|
|
@ -103,9 +112,107 @@ type BucketReconcilerOptions struct {
|
||||||
MaxConcurrentReconciles int
|
MaxConcurrentReconciles int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BucketProvider is an interface for fetching objects from a storage provider
|
||||||
|
// bucket.
|
||||||
|
type BucketProvider interface {
|
||||||
|
// BucketExists returns if an object storage bucket with the provided name
|
||||||
|
// exists, or returns a (client) error.
|
||||||
|
BucketExists(ctx context.Context, bucketName string) (bool, error)
|
||||||
|
// FGetObject gets the object from the provided object storage bucket, and
|
||||||
|
// writes it to targetPath.
|
||||||
|
// It returns the etag of the successfully fetched file, or any error.
|
||||||
|
FGetObject(ctx context.Context, bucketName, objectKey, targetPath string) (etag string, err error)
|
||||||
|
// VisitObjects iterates over the items in the provided object storage
|
||||||
|
// bucket, calling visit for every item.
|
||||||
|
// If the underlying client or the visit callback returns an error,
|
||||||
|
// it returns early.
|
||||||
|
VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error
|
||||||
|
// ObjectIsNotFound returns true if the given error indicates an object
|
||||||
|
// could not be found.
|
||||||
|
ObjectIsNotFound(error) bool
|
||||||
|
// Close closes the provider's client, if supported.
|
||||||
|
Close(context.Context)
|
||||||
|
}
|
||||||
|
|
||||||
// bucketReconcilerFunc is the function type for all the bucket reconciler
|
// bucketReconcilerFunc is the function type for all the bucket reconciler
|
||||||
// functions.
|
// functions.
|
||||||
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error)
|
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)
|
||||||
|
|
||||||
|
// etagIndex is an index of storage object keys and their Etag values.
|
||||||
|
type etagIndex struct {
|
||||||
|
sync.RWMutex
|
||||||
|
index map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEtagIndex returns a new etagIndex with an empty initialized index.
|
||||||
|
func newEtagIndex() *etagIndex {
|
||||||
|
return &etagIndex{
|
||||||
|
index: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Add(key, etag string) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
i.index[key] = etag
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Delete(key string) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
delete(i.index, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Get(key string) string {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
return i.index[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Has(key string) bool {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
_, ok := i.index[key]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Index() map[string]string {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
index := make(map[string]string)
|
||||||
|
for k, v := range i.index {
|
||||||
|
index[k] = v
|
||||||
|
}
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *etagIndex) Len() int {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
return len(i.index)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revision calculates the SHA256 checksum of the index.
|
||||||
|
// The keys are stable sorted, and the SHA256 sum is then calculated for the
|
||||||
|
// string representation of the key/value pairs, each pair written on a newline
|
||||||
|
// with a space between them. The sum result is returned as a string.
|
||||||
|
func (i *etagIndex) Revision() (string, error) {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
keyIndex := make([]string, 0, len(i.index))
|
||||||
|
for k := range i.index {
|
||||||
|
keyIndex = append(keyIndex, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(keyIndex)
|
||||||
|
sum := sha256.New()
|
||||||
|
for _, k := range keyIndex {
|
||||||
|
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||||
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
|
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
|
||||||
|
|
@ -201,9 +308,6 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
|
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
index := make(etagIndex)
|
|
||||||
var artifact sourcev1.Artifact
|
|
||||||
|
|
||||||
// Create temp working dir
|
// Create temp working dir
|
||||||
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
|
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -215,10 +319,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
// Run the sub-reconcilers and build the result of reconciliation.
|
// Run the sub-reconcilers and build the result of reconciliation.
|
||||||
var res sreconcile.Result
|
var (
|
||||||
var resErr error
|
res sreconcile.Result
|
||||||
|
resErr error
|
||||||
|
index = newEtagIndex()
|
||||||
|
)
|
||||||
|
|
||||||
for _, rec := range reconcilers {
|
for _, rec := range reconcilers {
|
||||||
recResult, err := rec(ctx, obj, index, &artifact, tmpDir)
|
recResult, err := rec(ctx, obj, index, tmpDir)
|
||||||
// Exit immediately on ResultRequeue.
|
// Exit immediately on ResultRequeue.
|
||||||
if recResult == sreconcile.ResultRequeue {
|
if recResult == sreconcile.ResultRequeue {
|
||||||
return sreconcile.ResultRequeue, nil
|
return sreconcile.ResultRequeue, nil
|
||||||
|
|
@ -241,8 +349,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
|
||||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
||||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
||||||
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
||||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context,
|
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
|
||||||
obj *sourcev1.Bucket, _ etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
|
||||||
// Garbage collect previous advertised artifact(s) from storage
|
// Garbage collect previous advertised artifact(s) from storage
|
||||||
_ = r.garbageCollect(ctx, obj)
|
_ = r.garbageCollect(ctx, obj)
|
||||||
|
|
||||||
|
|
@ -270,287 +377,50 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context,
|
||||||
// result.
|
// result.
|
||||||
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
|
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
|
||||||
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
|
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
|
||||||
func (r *BucketReconciler) reconcileSource(ctx context.Context,
|
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
|
||||||
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
secret, err := r.getBucketSecret(ctx, obj)
|
||||||
var secret *corev1.Secret
|
if err != nil {
|
||||||
if obj.Spec.SecretRef != nil {
|
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
|
||||||
secretName := types.NamespacedName{
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
Namespace: obj.GetNamespace(),
|
|
||||||
Name: obj.Spec.SecretRef.Name,
|
|
||||||
}
|
|
||||||
secret = &corev1.Secret{}
|
|
||||||
if err := r.Get(ctx, secretName, secret); err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err),
|
|
||||||
Reason: sourcev1.AuthenticationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, e.Err.Error())
|
|
||||||
// Return error as the world as observed may change
|
// Return error as the world as observed may change
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
// Construct provider client
|
||||||
|
var provider BucketProvider
|
||||||
switch obj.Spec.Provider {
|
switch obj.Spec.Provider {
|
||||||
case sourcev1.GoogleBucketProvider:
|
case sourcev1.GoogleBucketProvider:
|
||||||
return r.reconcileGCPSource(ctx, obj, index, artifact, secret, dir)
|
if err = gcp.ValidateSecret(secret); err != nil {
|
||||||
|
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
|
||||||
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
|
return sreconcile.ResultEmpty, e
|
||||||
|
}
|
||||||
|
if provider, err = gcp.NewClient(ctx, secret); err != nil {
|
||||||
|
e := &serror.Event{Err: err, Reason: "ClientError"}
|
||||||
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
|
return sreconcile.ResultEmpty, e
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return r.reconcileMinioSource(ctx, obj, index, artifact, secret, dir)
|
if err = minio.ValidateSecret(secret); err != nil {
|
||||||
}
|
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
|
||||||
}
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
|
|
||||||
// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the
|
|
||||||
// declared configuration, and observes its state.
|
|
||||||
//
|
|
||||||
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
|
|
||||||
// account. In case of an error during the download process (including transient errors), it records
|
|
||||||
// v1beta1.FetchFailedCondition=True and returns early.
|
|
||||||
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
|
|
||||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
|
||||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
|
||||||
func (r *BucketReconciler) reconcileMinioSource(ctx context.Context,
|
|
||||||
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
|
||||||
// Build the client with the configuration from the object and secret
|
|
||||||
s3Client, err := r.buildMinioClient(obj, secret)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to construct S3 client: %w", err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
// Return error as the contents of the secret may change
|
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
|
if provider, err = minio.NewClient(obj, secret); err != nil {
|
||||||
// Confirm bucket exists
|
e := &serror.Event{Err: err, Reason: "ClientError"}
|
||||||
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
defer cancel()
|
|
||||||
exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
|
|
||||||
// Look for file with ignore rules first
|
|
||||||
path := filepath.Join(dir, sourceignore.IgnoreFile)
|
|
||||||
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
|
|
||||||
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
// In-spec patterns take precedence
|
|
||||||
if obj.Spec.Ignore != nil {
|
|
||||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
|
|
||||||
}
|
|
||||||
matcher := sourceignore.NewMatcher(ps)
|
|
||||||
|
|
||||||
// Build up an index of object keys and their etags
|
// Fetch etag index
|
||||||
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
if err = fetchEtagIndex(ctx, provider, obj, index, dir); err != nil {
|
||||||
// detect both structural and file changes
|
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
|
||||||
for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
Recursive: true,
|
|
||||||
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
|
|
||||||
}) {
|
|
||||||
if err = object.Err; err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore directories and the .sourceignore file
|
// Calculate revision
|
||||||
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Ignore matches
|
|
||||||
if matcher.Match(strings.Split(object.Key, "/"), false) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
index[object.Key] = object.ETag
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate revision checksum from the collected index values
|
|
||||||
revision, err := index.Revision()
|
|
||||||
if err != nil {
|
|
||||||
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
|
|
||||||
return sreconcile.ResultEmpty, &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to calculate revision: %w", err),
|
|
||||||
Reason: meta.FailedReason,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !obj.GetArtifact().HasRevision(revision) {
|
|
||||||
// Mark observations about the revision on the object
|
|
||||||
message := fmt.Sprintf("new upstream revision '%s'", revision)
|
|
||||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
|
|
||||||
conditions.MarkReconciling(obj, "NewRevision", message)
|
|
||||||
|
|
||||||
// Download the files in parallel, but with a limited number of workers
|
|
||||||
group, groupCtx := errgroup.WithContext(ctx)
|
|
||||||
group.Go(func() error {
|
|
||||||
const workers = 4
|
|
||||||
sem := semaphore.NewWeighted(workers)
|
|
||||||
for key := range index {
|
|
||||||
k := key
|
|
||||||
if err := sem.Acquire(groupCtx, 1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
group.Go(func() error {
|
|
||||||
defer sem.Release(1)
|
|
||||||
localPath := filepath.Join(dir, k)
|
|
||||||
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil {
|
|
||||||
return fmt.Errorf("failed to get '%s' file: %w", k, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err = group.Wait(); err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
|
||||||
|
|
||||||
// Create potential new artifact
|
|
||||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
|
||||||
return sreconcile.ResultSuccess, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the
|
|
||||||
// declared configuration, and observes its state.
|
|
||||||
//
|
|
||||||
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
|
|
||||||
// account. In case of an error during the download process (including transient errors), it records
|
|
||||||
// v1beta1.DownloadFailedCondition=True and returns early.
|
|
||||||
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
|
|
||||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
|
||||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
|
||||||
func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
|
|
||||||
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
|
||||||
gcpClient, err := r.buildGCPClient(ctx, secret)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to construct GCP client: %w", err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
// Return error as the contents of the secret may change
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
defer gcpClient.Close(ctrl.LoggerFrom(ctx))
|
|
||||||
|
|
||||||
// Confirm bucket exists
|
|
||||||
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
|
||||||
defer cancel()
|
|
||||||
exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
|
|
||||||
// Look for file with ignore rules first
|
|
||||||
path := filepath.Join(dir, sourceignore.IgnoreFile)
|
|
||||||
if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
|
|
||||||
if err != gcpstorage.ErrObjectNotExist {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
|
||||||
if err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
// In-spec patterns take precedence
|
|
||||||
if obj.Spec.Ignore != nil {
|
|
||||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
|
|
||||||
}
|
|
||||||
matcher := sourceignore.NewMatcher(ps)
|
|
||||||
|
|
||||||
// Build up an index of object keys and their etags
|
|
||||||
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
|
||||||
// detect both structural and file changes
|
|
||||||
objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil)
|
|
||||||
for {
|
|
||||||
object, err := objects.Next()
|
|
||||||
if err != nil {
|
|
||||||
if err == gcp.IteratorDone {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if matcher.Match(strings.Split(object.Name, "/"), false) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
index[object.Name] = object.Etag
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate revision checksum from the collected index values
|
|
||||||
revision, err := index.Revision()
|
revision, err := index.Revision()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sreconcile.ResultEmpty, &serror.Event{
|
return sreconcile.ResultEmpty, &serror.Event{
|
||||||
|
|
@ -559,46 +429,32 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !obj.GetArtifact().HasRevision(revision) {
|
|
||||||
// Mark observations about the revision on the object
|
// Mark observations about the revision on the object
|
||||||
|
defer func() {
|
||||||
|
// As fetchIndexFiles can make last-minute modifications to the etag
|
||||||
|
// index, we need to re-calculate the revision at the end
|
||||||
|
revision, err := index.Revision()
|
||||||
|
if err != nil {
|
||||||
|
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !obj.GetArtifact().HasRevision(revision) {
|
||||||
message := fmt.Sprintf("new upstream revision '%s'", revision)
|
message := fmt.Sprintf("new upstream revision '%s'", revision)
|
||||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
|
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
|
||||||
conditions.MarkReconciling(obj, "NewRevision", message)
|
conditions.MarkReconciling(obj, "NewRevision", message)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Download the files in parallel, but with a limited number of workers
|
if !obj.GetArtifact().HasRevision(revision) {
|
||||||
group, groupCtx := errgroup.WithContext(ctx)
|
if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil {
|
||||||
group.Go(func() error {
|
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
|
||||||
const workers = 4
|
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
|
||||||
sem := semaphore.NewWeighted(workers)
|
|
||||||
for key := range index {
|
|
||||||
k := key
|
|
||||||
if err := sem.Acquire(groupCtx, 1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
group.Go(func() error {
|
|
||||||
defer sem.Release(1)
|
|
||||||
localPath := filepath.Join(dir, k)
|
|
||||||
if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil {
|
|
||||||
return fmt.Errorf("failed to get '%s' file: %w", k, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err = group.Wait(); err != nil {
|
|
||||||
e := &serror.Event{
|
|
||||||
Err: fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err),
|
|
||||||
Reason: sourcev1.BucketOperationFailedReason,
|
|
||||||
}
|
|
||||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
|
||||||
return sreconcile.ResultEmpty, e
|
return sreconcile.ResultEmpty, e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
|
||||||
|
|
||||||
// Create potential new artifact
|
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
||||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
|
||||||
return sreconcile.ResultSuccess, nil
|
return sreconcile.ResultSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -609,8 +465,19 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context,
|
||||||
// If the given artifact does not differ from the object's current, it returns early.
|
// If the given artifact does not differ from the object's current, it returns early.
|
||||||
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
||||||
// updated to its path.
|
// updated to its path.
|
||||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
|
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
|
||||||
obj *sourcev1.Bucket, index etagIndex, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
// Calculate revision
|
||||||
|
revision, err := index.Revision()
|
||||||
|
if err != nil {
|
||||||
|
return sreconcile.ResultEmpty, &serror.Event{
|
||||||
|
Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err),
|
||||||
|
Reason: meta.FailedReason,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create artifact
|
||||||
|
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
||||||
|
|
||||||
// Always restore the Ready condition in case it got removed due to a transient error
|
// Always restore the Ready condition in case it got removed due to a transient error
|
||||||
defer func() {
|
defer func() {
|
||||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||||
|
|
@ -640,13 +507,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure artifact directory exists and acquire lock
|
// Ensure artifact directory exists and acquire lock
|
||||||
if err := r.Storage.MkdirAll(*artifact); err != nil {
|
if err := r.Storage.MkdirAll(artifact); err != nil {
|
||||||
return sreconcile.ResultEmpty, &serror.Event{
|
return sreconcile.ResultEmpty, &serror.Event{
|
||||||
Err: fmt.Errorf("failed to create artifact directory: %w", err),
|
Err: fmt.Errorf("failed to create artifact directory: %w", err),
|
||||||
Reason: sourcev1.StorageOperationFailedReason,
|
Reason: sourcev1.StorageOperationFailedReason,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
unlock, err := r.Storage.Lock(*artifact)
|
unlock, err := r.Storage.Lock(artifact)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sreconcile.ResultEmpty, &serror.Event{
|
return sreconcile.ResultEmpty, &serror.Event{
|
||||||
Err: fmt.Errorf("failed to acquire lock for artifact: %w", err),
|
Err: fmt.Errorf("failed to acquire lock for artifact: %w", err),
|
||||||
|
|
@ -656,7 +523,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
// Archive directory to storage
|
// Archive directory to storage
|
||||||
if err := r.Storage.Archive(artifact, dir, nil); err != nil {
|
if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
|
||||||
return sreconcile.ResultEmpty, &serror.Event{
|
return sreconcile.ResultEmpty, &serror.Event{
|
||||||
Err: fmt.Errorf("unable to archive artifact to storage: %s", err),
|
Err: fmt.Errorf("unable to archive artifact to storage: %s", err),
|
||||||
Reason: sourcev1.StorageOperationFailedReason,
|
Reason: sourcev1.StorageOperationFailedReason,
|
||||||
|
|
@ -665,13 +532,13 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context,
|
||||||
r.annotatedEventLogf(ctx, obj, map[string]string{
|
r.annotatedEventLogf(ctx, obj, map[string]string{
|
||||||
"revision": artifact.Revision,
|
"revision": artifact.Revision,
|
||||||
"checksum": artifact.Checksum,
|
"checksum": artifact.Checksum,
|
||||||
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", len(index), obj.Spec.BucketName)
|
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
|
||||||
|
|
||||||
// Record it on the object
|
// Record it on the object
|
||||||
obj.Status.Artifact = artifact.DeepCopy()
|
obj.Status.Artifact = artifact.DeepCopy()
|
||||||
|
|
||||||
// Update symlink on a "best effort" basis
|
// Update symlink on a "best effort" basis
|
||||||
url, err := r.Storage.Symlink(*artifact, "latest.tar.gz")
|
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
|
r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
|
||||||
"failed to update status URL symlink: %s", err)
|
"failed to update status URL symlink: %s", err)
|
||||||
|
|
@ -729,74 +596,21 @@ func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Buc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildMinioClient constructs a minio.Client with the data from the given object and Secret.
|
// getBucketSecret attempts to fetch the Secret reference if specified on the
|
||||||
// It returns an error if the Secret does not have the required fields, or if there is no credential handler
|
// obj. It returns any client error.
|
||||||
// configured.
|
func (r *BucketReconciler) getBucketSecret(ctx context.Context, obj *sourcev1.Bucket) (*corev1.Secret, error) {
|
||||||
func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) {
|
if obj.Spec.SecretRef == nil {
|
||||||
opts := minio.Options{
|
return nil, nil
|
||||||
Region: obj.Spec.Region,
|
|
||||||
Secure: !obj.Spec.Insecure,
|
|
||||||
}
|
}
|
||||||
if secret != nil {
|
secretName := types.NamespacedName{
|
||||||
var accessKey, secretKey string
|
Namespace: obj.GetNamespace(),
|
||||||
if k, ok := secret.Data["accesskey"]; ok {
|
Name: obj.Spec.SecretRef.Name,
|
||||||
accessKey = string(k)
|
|
||||||
}
|
}
|
||||||
if k, ok := secret.Data["secretkey"]; ok {
|
secret := &corev1.Secret{}
|
||||||
secretKey = string(k)
|
if err := r.Get(ctx, secretName, secret); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err)
|
||||||
}
|
}
|
||||||
if accessKey == "" || secretKey == "" {
|
return secret, nil
|
||||||
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
|
|
||||||
}
|
|
||||||
opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
|
|
||||||
} else if obj.Spec.Provider == sourcev1.AmazonBucketProvider {
|
|
||||||
opts.Creds = credentials.NewIAM("")
|
|
||||||
}
|
|
||||||
return minio.New(obj.Spec.Endpoint, &opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret.
|
|
||||||
// It returns an error if the Secret does not have the required field, or if the client construction fails.
|
|
||||||
func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) {
|
|
||||||
var client *gcp.GCPClient
|
|
||||||
var err error
|
|
||||||
if secret != nil {
|
|
||||||
if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
client, err = gcp.NewClient(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// etagIndex is an index of bucket keys and their Etag values.
|
|
||||||
type etagIndex map[string]string
|
|
||||||
|
|
||||||
// Revision calculates the SHA256 checksum of the index.
|
|
||||||
// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of
|
|
||||||
// the key/value pairs, each pair written on a newline
|
|
||||||
// The sum result is returned as a string.
|
|
||||||
func (i etagIndex) Revision() (string, error) {
|
|
||||||
keyIndex := make([]string, 0, len(i))
|
|
||||||
for k := range i {
|
|
||||||
keyIndex = append(keyIndex, k)
|
|
||||||
}
|
|
||||||
sort.Strings(keyIndex)
|
|
||||||
sum := sha256.New()
|
|
||||||
for _, k := range keyIndex {
|
|
||||||
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// eventLogf records event and logs at the same time.
|
// eventLogf records event and logs at the same time.
|
||||||
|
|
@ -819,3 +633,106 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context,
|
||||||
}
|
}
|
||||||
r.AnnotatedEventf(obj, annotations, eventType, reason, msg)
|
r.AnnotatedEventf(obj, annotations, eventType, reason, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchEtagIndex fetches the current etagIndex for the in the obj specified
|
||||||
|
// bucket using the given provider, while filtering them using .sourceignore
|
||||||
|
// rules. After fetching an object, the etag value in the index is updated to
|
||||||
|
// the current value to ensure accuracy.
|
||||||
|
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Confirm bucket exists
|
||||||
|
exists, err := provider.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to confirm existence of '%s' bucket: %w", obj.Spec.BucketName, err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
err = fmt.Errorf("bucket '%s' not found", obj.Spec.BucketName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look for file with ignore rules first
|
||||||
|
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
|
||||||
|
if _, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
|
||||||
|
if !provider.ObjectIsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// In-spec patterns take precedence
|
||||||
|
if obj.Spec.Ignore != nil {
|
||||||
|
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
|
||||||
|
}
|
||||||
|
matcher := sourceignore.NewMatcher(ps)
|
||||||
|
|
||||||
|
// Build up index
|
||||||
|
err = provider.VisitObjects(ctxTimeout, obj.Spec.BucketName, func(key, etag string) error {
|
||||||
|
if strings.HasSuffix(key, "/") || key == sourceignore.IgnoreFile {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if matcher.Match(strings.Split(key, "/"), false) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
index.Add(key, etag)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("indexation of objects from bucket '%s' failed: %w", obj.Spec.BucketName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchIndexFiles fetches the object files for the keys from the given etagIndex
|
||||||
|
// using the given provider, and stores them into tempDir. It downloads in
|
||||||
|
// parallel, but limited to the maxConcurrentBucketFetches.
|
||||||
|
// Given an index is provided, the bucket is assumed to exist.
|
||||||
|
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Download in parallel, but bound the concurrency. According to
|
||||||
|
// AWS and GCP docs, rate limits are either soft or don't exist:
|
||||||
|
// - https://cloud.google.com/storage/quotas
|
||||||
|
// - https://docs.aws.amazon.com/general/latest/gr/s3.html
|
||||||
|
// .. so, the limiting factor is this process keeping a small footprint.
|
||||||
|
group, groupCtx := errgroup.WithContext(ctx)
|
||||||
|
group.Go(func() error {
|
||||||
|
sem := semaphore.NewWeighted(maxConcurrentBucketFetches)
|
||||||
|
for key, etag := range index.Index() {
|
||||||
|
k := key
|
||||||
|
t := etag
|
||||||
|
if err := sem.Acquire(groupCtx, 1); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
group.Go(func() error {
|
||||||
|
defer sem.Release(1)
|
||||||
|
localPath := filepath.Join(tempDir, k)
|
||||||
|
etag, err := provider.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath)
|
||||||
|
if err != nil {
|
||||||
|
if provider.ObjectIsNotFound(err) {
|
||||||
|
ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("indexed object '%s' disappeared from '%s' bucket", k, obj.Spec.BucketName))
|
||||||
|
index.Delete(k)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to get '%s' object: %w", k, err)
|
||||||
|
}
|
||||||
|
if t != etag {
|
||||||
|
index.Add(k, etag)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err := group.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("fetch from bucket '%s' failed: %w", obj.Spec.BucketName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,322 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package controllers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gotest.tools/assert"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
|
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockBucketObject struct {
|
||||||
|
etag string
|
||||||
|
data string
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockBucketClient struct {
|
||||||
|
bucketName string
|
||||||
|
objects map[string]mockBucketObject
|
||||||
|
}
|
||||||
|
|
||||||
|
var mockNotFound = fmt.Errorf("not found")
|
||||||
|
|
||||||
|
func (m mockBucketClient) BucketExists(_ context.Context, name string) (bool, error) {
|
||||||
|
return name == m.bucketName, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockBucketClient) FGetObject(_ context.Context, bucket, obj, path string) (string, error) {
|
||||||
|
if bucket != m.bucketName {
|
||||||
|
return "", fmt.Errorf("bucket does not exist")
|
||||||
|
}
|
||||||
|
// tiny bit of protocol, for convenience: if asked for an object "error", then return an error.
|
||||||
|
if obj == "error" {
|
||||||
|
return "", fmt.Errorf("I was asked to report an error")
|
||||||
|
}
|
||||||
|
object, ok := m.objects[obj]
|
||||||
|
if !ok {
|
||||||
|
return "", mockNotFound
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(path, []byte(object.data), os.FileMode(0660)); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return object.etag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockBucketClient) ObjectIsNotFound(e error) bool {
|
||||||
|
return e == mockNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockBucketClient) VisitObjects(_ context.Context, _ string, f func(key, etag string) error) error {
|
||||||
|
for key, obj := range m.objects {
|
||||||
|
if err := f(key, obj.etag); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockBucketClient) Close(_ context.Context) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockBucketClient) addObject(key string, object mockBucketObject) {
|
||||||
|
if m.objects == nil {
|
||||||
|
m.objects = make(map[string]mockBucketObject)
|
||||||
|
}
|
||||||
|
m.objects[key] = object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockBucketClient) objectsToEtagIndex() *etagIndex {
|
||||||
|
i := newEtagIndex()
|
||||||
|
for k, v := range m.objects {
|
||||||
|
i.Add(k, v.etag)
|
||||||
|
}
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_fetchEtagIndex(t *testing.T) {
|
||||||
|
bucketName := "all-my-config"
|
||||||
|
|
||||||
|
bucket := sourcev1.Bucket{
|
||||||
|
Spec: sourcev1.BucketSpec{
|
||||||
|
BucketName: bucketName,
|
||||||
|
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("fetches etag index", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
|
||||||
|
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
|
||||||
|
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, index.Len(), 3)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("an error while bucket does not exist", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: "other-bucket-name"}
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
assert.ErrorContains(t, err, "not found")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("filters with .sourceignore rules", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`})
|
||||||
|
client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"})
|
||||||
|
client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"})
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := index.Has("foo.txt"); ok {
|
||||||
|
t.Error(fmt.Errorf("expected 'foo.txt' index item to not exist"))
|
||||||
|
}
|
||||||
|
assert.Equal(t, index.Len(), 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("filters with ignore rules from object", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject(".sourceignore", mockBucketObject{etag: "sourceignore1", data: `*.txt`})
|
||||||
|
client.addObject("foo.txt", mockBucketObject{etag: "etag1", data: "foo.txt"})
|
||||||
|
|
||||||
|
ignore := "!*.txt"
|
||||||
|
bucket := bucket.DeepCopy()
|
||||||
|
bucket.Spec.Ignore = &ignore
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
err = fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(filepath.Join(tmp, ".sourceignore")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, index.Len(), 1)
|
||||||
|
if ok := index.Has("foo.txt"); !ok {
|
||||||
|
t.Error(fmt.Errorf("expected 'foo.txt' index item to exist"))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_fetchFiles(t *testing.T) {
|
||||||
|
bucketName := "all-my-config"
|
||||||
|
|
||||||
|
bucket := sourcev1.Bucket{
|
||||||
|
Spec: sourcev1.BucketSpec{
|
||||||
|
BucketName: bucketName,
|
||||||
|
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("fetches files", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
|
||||||
|
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
|
||||||
|
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
|
||||||
|
|
||||||
|
index := client.objectsToEtagIndex()
|
||||||
|
|
||||||
|
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for path := range index.Index() {
|
||||||
|
p := filepath.Join(tmp, path)
|
||||||
|
_, err := os.Stat(p)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("an error while fetching returns an error for the whole procedure", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}}
|
||||||
|
client.objects["error"] = mockBucketObject{}
|
||||||
|
|
||||||
|
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error but got nil")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("a changed etag updates the index", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"})
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
index.Add("foo.yaml", "etag1")
|
||||||
|
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
f := index.Get("foo.yaml")
|
||||||
|
assert.Equal(t, f, "etag2")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("a disappeared index entry is removed from the index", func(t *testing.T) {
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
|
||||||
|
|
||||||
|
index := newEtagIndex()
|
||||||
|
index.Add("foo.yaml", "etag1")
|
||||||
|
// Does not exist on server
|
||||||
|
index.Add("bar.yaml", "etag2")
|
||||||
|
|
||||||
|
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
f := index.Get("foo.yaml")
|
||||||
|
assert.Equal(t, f, "etag1")
|
||||||
|
assert.Check(t, !index.Has("bar.yaml"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("can fetch more than maxConcurrentFetches", func(t *testing.T) {
|
||||||
|
// this will fail if, for example, the semaphore is not used correctly and blocks
|
||||||
|
tmp, err := os.MkdirTemp("", "test-bucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmp)
|
||||||
|
|
||||||
|
client := mockBucketClient{bucketName: bucketName}
|
||||||
|
for i := 0; i < 2*maxConcurrentBucketFetches; i++ {
|
||||||
|
f := fmt.Sprintf("file-%d", i)
|
||||||
|
client.addObject(f, mockBucketObject{etag: f, data: f})
|
||||||
|
}
|
||||||
|
index := client.objectsToEtagIndex()
|
||||||
|
|
||||||
|
err = fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
2
go.mod
2
go.mod
|
|
@ -26,6 +26,7 @@ require (
|
||||||
github.com/go-git/go-billy/v5 v5.3.1
|
github.com/go-git/go-billy/v5 v5.3.1
|
||||||
github.com/go-git/go-git/v5 v5.4.2
|
github.com/go-git/go-git/v5 v5.4.2
|
||||||
github.com/go-logr/logr v1.2.2
|
github.com/go-logr/logr v1.2.2
|
||||||
|
github.com/google/uuid v1.3.0
|
||||||
github.com/libgit2/git2go/v33 v33.0.6
|
github.com/libgit2/git2go/v33 v33.0.6
|
||||||
github.com/minio/minio-go/v7 v7.0.15
|
github.com/minio/minio-go/v7 v7.0.15
|
||||||
github.com/onsi/gomega v1.17.0
|
github.com/onsi/gomega v1.17.0
|
||||||
|
|
@ -97,7 +98,6 @@ require (
|
||||||
github.com/google/go-cmp v0.5.7 // indirect
|
github.com/google/go-cmp v0.5.7 // indirect
|
||||||
github.com/google/gofuzz v1.2.0 // indirect
|
github.com/google/gofuzz v1.2.0 // indirect
|
||||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
|
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
|
||||||
github.com/google/uuid v1.3.0 // indirect
|
|
||||||
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
|
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
|
||||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||||
github.com/gorilla/mux v1.8.0 // indirect
|
github.com/gorilla/mux v1.8.0 // indirect
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,220 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package gcs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
raw "google.golang.org/api/storage/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ObjectNotFound = errors.New("object not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Object is a mock Server object.
|
||||||
|
type Object struct {
|
||||||
|
Key string
|
||||||
|
Generation int64
|
||||||
|
MetaGeneration int64
|
||||||
|
ContentType string
|
||||||
|
Content []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server is a simple Google Cloud Storage mock server.
|
||||||
|
// It serves the provided Objects for the BucketName on the HTTPAddress when
|
||||||
|
// Start or StartTLS is called.
|
||||||
|
// It provides primitive support "Generation Conditions" when Object contents
|
||||||
|
// are fetched.
|
||||||
|
// Ref: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions
|
||||||
|
type Server struct {
|
||||||
|
srv *httptest.Server
|
||||||
|
mux *http.ServeMux
|
||||||
|
|
||||||
|
BucketName string
|
||||||
|
Objects []*Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(bucketName string) *Server {
|
||||||
|
s := &Server{BucketName: bucketName}
|
||||||
|
s.mux = http.NewServeMux()
|
||||||
|
s.mux.Handle("/", http.HandlerFunc(s.handler))
|
||||||
|
|
||||||
|
s.srv = httptest.NewUnstartedServer(s.mux)
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Start() {
|
||||||
|
s.srv.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) StartTLS(config *tls.Config) {
|
||||||
|
s.srv.TLS = config
|
||||||
|
s.srv.StartTLS()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Stop() {
|
||||||
|
s.srv.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) HTTPAddress() string {
|
||||||
|
return s.srv.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getAllObjects() *raw.Objects {
|
||||||
|
objs := &raw.Objects{}
|
||||||
|
for _, o := range s.Objects {
|
||||||
|
objs.Items = append(objs.Items, getGCSObject(s.BucketName, *o))
|
||||||
|
}
|
||||||
|
return objs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getObjectFile(key string, generation int64) ([]byte, error) {
|
||||||
|
for _, o := range s.Objects {
|
||||||
|
if o.Key == key {
|
||||||
|
if generation == 0 || generation == o.Generation {
|
||||||
|
return o.Content, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ObjectNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch {
|
||||||
|
// Handle Bucket metadata related queries
|
||||||
|
case strings.HasPrefix(r.RequestURI, "/b/"):
|
||||||
|
switch {
|
||||||
|
// Return metadata about the Bucket
|
||||||
|
case r.RequestURI == fmt.Sprintf("/b/%s?alt=json&prettyPrint=false&projection=full", s.BucketName):
|
||||||
|
etag := md5.New()
|
||||||
|
for _, v := range s.Objects {
|
||||||
|
etag.Write(v.Content)
|
||||||
|
}
|
||||||
|
response := getGCSBucket(s.BucketName, fmt.Sprintf("%x", etag.Sum(nil)))
|
||||||
|
jsonResponse, err := json.Marshal(response)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write(jsonResponse)
|
||||||
|
return
|
||||||
|
// Return metadata about a Bucket object
|
||||||
|
case strings.Contains(r.RequestURI, "/o/"):
|
||||||
|
var obj *Object
|
||||||
|
for _, o := range s.Objects {
|
||||||
|
// The object key in the URI is escaped.
|
||||||
|
// e.g.: /b/dummy/o/included%2Ffile.txt?alt=json&prettyPrint=false&projection=full
|
||||||
|
if r.RequestURI == fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", s.BucketName, url.QueryEscape(o.Key)) {
|
||||||
|
obj = o
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if obj != nil {
|
||||||
|
response := getGCSObject(s.BucketName, *obj)
|
||||||
|
jsonResponse, err := json.Marshal(response)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write(jsonResponse)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
// Return metadata about all objects in the Bucket
|
||||||
|
case strings.Contains(r.RequestURI, "/o?"):
|
||||||
|
response := s.getAllObjects()
|
||||||
|
jsonResponse, err := json.Marshal(response)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write(jsonResponse)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Handle object file query
|
||||||
|
default:
|
||||||
|
bucketPrefix := fmt.Sprintf("/%s/", s.BucketName)
|
||||||
|
if strings.HasPrefix(r.RequestURI, bucketPrefix) {
|
||||||
|
// The URL path is of the format /<bucket>/included/file.txt.
|
||||||
|
// Extract the object key by discarding the bucket prefix.
|
||||||
|
key := strings.TrimPrefix(r.URL.Path, bucketPrefix)
|
||||||
|
|
||||||
|
// Support "Generation Conditions"
|
||||||
|
// https://pkg.go.dev/cloud.google.com/go/storage#hdr-Conditions
|
||||||
|
var generation int64
|
||||||
|
if matchGeneration := r.URL.Query().Get("ifGenerationMatch"); matchGeneration != "" {
|
||||||
|
var err error
|
||||||
|
if generation, err = strconv.ParseInt(matchGeneration, 10, 64); err != nil {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle returning object file in a bucket.
|
||||||
|
response, err := s.getObjectFile(key, generation)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write(response)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getGCSObject(bucket string, obj Object) *raw.Object {
|
||||||
|
hash := md5.Sum(obj.Content)
|
||||||
|
etag := fmt.Sprintf("%x", hash)
|
||||||
|
return &raw.Object{
|
||||||
|
Bucket: bucket,
|
||||||
|
Name: obj.Key,
|
||||||
|
ContentType: obj.ContentType,
|
||||||
|
Generation: obj.Generation,
|
||||||
|
Metageneration: obj.MetaGeneration,
|
||||||
|
Md5Hash: etag,
|
||||||
|
Etag: etag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getGCSBucket(name, eTag string) *raw.Bucket {
|
||||||
|
return &raw.Bucket{
|
||||||
|
Name: name,
|
||||||
|
Location: "loc",
|
||||||
|
Etag: eTag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,157 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Object is a mock Server object.
|
||||||
|
type Object struct {
|
||||||
|
Key string
|
||||||
|
LastModified time.Time
|
||||||
|
ContentType string
|
||||||
|
Content []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server is a simple AWS S3 mock server.
|
||||||
|
// It serves the provided Objects for the BucketName on the HTTPAddress when
|
||||||
|
// Start or StartTLS is called.
|
||||||
|
type Server struct {
|
||||||
|
srv *httptest.Server
|
||||||
|
mux *http.ServeMux
|
||||||
|
|
||||||
|
BucketName string
|
||||||
|
Objects []*Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(bucketName string) *Server {
|
||||||
|
s := &Server{BucketName: bucketName}
|
||||||
|
s.mux = http.NewServeMux()
|
||||||
|
s.mux.Handle("/", http.HandlerFunc(s.handler))
|
||||||
|
|
||||||
|
s.srv = httptest.NewUnstartedServer(s.mux)
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Start() {
|
||||||
|
s.srv.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) StartTLS(config *tls.Config) {
|
||||||
|
s.srv.TLS = config
|
||||||
|
s.srv.StartTLS()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Stop() {
|
||||||
|
s.srv.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) HTTPAddress() string {
|
||||||
|
return s.srv.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
key := path.Base(r.URL.Path)
|
||||||
|
|
||||||
|
switch key {
|
||||||
|
case s.BucketName:
|
||||||
|
w.Header().Add("Content-Type", "application/xml")
|
||||||
|
|
||||||
|
if r.Method == http.MethodHead {
|
||||||
|
w.WriteHeader(200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.URL.Query().Has("location") {
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write([]byte(`
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">Europe</LocationConstraint>
|
||||||
|
`))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contents := ""
|
||||||
|
for _, o := range s.Objects {
|
||||||
|
etag := md5.Sum(o.Content)
|
||||||
|
contents += fmt.Sprintf(`
|
||||||
|
<Contents>
|
||||||
|
<Key>%s</Key>
|
||||||
|
<LastModified>%s</LastModified>
|
||||||
|
<Size>%d</Size>
|
||||||
|
<ETag>"%x"</ETag>
|
||||||
|
<StorageClass>STANDARD</StorageClass>
|
||||||
|
</Contents>`, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, `
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||||
|
<Name>%s</Name>
|
||||||
|
<Prefix/>
|
||||||
|
<Marker/>
|
||||||
|
<KeyCount>%d</KeyCount>
|
||||||
|
<MaxKeys>1000</MaxKeys>
|
||||||
|
<IsTruncated>false</IsTruncated>
|
||||||
|
%s
|
||||||
|
</ListBucketResult>
|
||||||
|
`, s.BucketName, len(s.Objects), contents)
|
||||||
|
default:
|
||||||
|
key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var found *Object
|
||||||
|
for _, o := range s.Objects {
|
||||||
|
if key == o.Key {
|
||||||
|
found = o
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found == nil {
|
||||||
|
w.WriteHeader(404)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
etag := md5.Sum(found.Content)
|
||||||
|
lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
|
||||||
|
|
||||||
|
w.Header().Add("Content-Type", found.ContentType)
|
||||||
|
w.Header().Add("Last-Modified", lastModified)
|
||||||
|
w.Header().Add("ETag", fmt.Sprintf("\"%x\"", etag))
|
||||||
|
w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
|
||||||
|
|
||||||
|
if r.Method == http.MethodHead {
|
||||||
|
w.WriteHeader(200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(200)
|
||||||
|
w.Write(found.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
142
pkg/gcp/gcp.go
142
pkg/gcp/gcp.go
|
|
@ -28,6 +28,8 @@ import (
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
"google.golang.org/api/iterator"
|
"google.golang.org/api/iterator"
|
||||||
"google.golang.org/api/option"
|
"google.golang.org/api/option"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -37,12 +39,10 @@ var (
|
||||||
// ErrorDirectoryExists is an error returned when the filename provided
|
// ErrorDirectoryExists is an error returned when the filename provided
|
||||||
// is a directory.
|
// is a directory.
|
||||||
ErrorDirectoryExists = errors.New("filename is a directory")
|
ErrorDirectoryExists = errors.New("filename is a directory")
|
||||||
// ErrorObjectDoesNotExist is an error returned when the object whose name
|
|
||||||
// is provided does not exist.
|
|
||||||
ErrorObjectDoesNotExist = errors.New("object does not exist")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type GCPClient struct {
|
// GCSClient is a minimal Google Cloud Storage client for fetching objects.
|
||||||
|
type GCSClient struct {
|
||||||
// client for interacting with the Google Cloud
|
// client for interacting with the Google Cloud
|
||||||
// Storage APIs.
|
// Storage APIs.
|
||||||
*gcpstorage.Client
|
*gcpstorage.Client
|
||||||
|
|
@ -50,27 +50,39 @@ type GCPClient struct {
|
||||||
|
|
||||||
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
|
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
|
||||||
// Credential environment variable or look for the Google Application Credential file.
|
// Credential environment variable or look for the Google Application Credential file.
|
||||||
func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) {
|
func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) {
|
||||||
client, err := gcpstorage.NewClient(ctx, opts...)
|
c := &GCSClient{}
|
||||||
|
if secret != nil {
|
||||||
|
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
c.Client = client
|
||||||
return &GCPClient{Client: client}, nil
|
} else {
|
||||||
|
client, err := gcpstorage.NewClient(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.Client = client
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateSecret validates the credential secrets
|
// ValidateSecret validates the credential secret. The provided Secret may
|
||||||
// It ensures that needed secret fields are not missing.
|
// be nil.
|
||||||
func ValidateSecret(secret map[string][]byte, name string) error {
|
func ValidateSecret(secret *corev1.Secret) error {
|
||||||
if _, exists := secret["serviceaccount"]; !exists {
|
if secret == nil {
|
||||||
return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", name)
|
return nil
|
||||||
|
}
|
||||||
|
if _, exists := secret.Data["serviceaccount"]; !exists {
|
||||||
|
return fmt.Errorf("invalid '%s' secret data: required fields 'serviceaccount'", secret.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketExists checks if the bucket with the provided name exists.
|
// BucketExists returns if an object storage bucket with the provided name
|
||||||
func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
|
// exists, or returns a (client) error.
|
||||||
|
func (c *GCSClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
|
||||||
_, err := c.Client.Bucket(bucketName).Attrs(ctx)
|
_, err := c.Client.Bucket(bucketName).Attrs(ctx)
|
||||||
if err == gcpstorage.ErrBucketNotExist {
|
if err == gcpstorage.ErrBucketNotExist {
|
||||||
// Not returning error to be compatible with minio's API.
|
// Not returning error to be compatible with minio's API.
|
||||||
|
|
@ -82,34 +94,23 @@ func (c *GCPClient) BucketExists(ctx context.Context, bucketName string) (bool,
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectExists checks if the object with the provided name exists.
|
// FGetObject gets the object from the provided object storage bucket, and
|
||||||
func (c *GCPClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) {
|
// writes it to targetPath.
|
||||||
_, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx)
|
// It returns the etag of the successfully fetched file, or any error.
|
||||||
// ErrObjectNotExist is returned if the object does not exist
|
func (c *GCSClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) {
|
||||||
if err == gcpstorage.ErrObjectNotExist {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FGetObject gets the object from the bucket and downloads the object locally
|
|
||||||
func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error {
|
|
||||||
// Verify if destination already exists.
|
// Verify if destination already exists.
|
||||||
dirStatus, err := os.Stat(localPath)
|
dirStatus, err := os.Stat(localPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// If the destination exists and is a directory.
|
// If the destination exists and is a directory.
|
||||||
if dirStatus.IsDir() {
|
if dirStatus.IsDir() {
|
||||||
return ErrorDirectoryExists
|
return "", ErrorDirectoryExists
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proceed if file does not exist. return for all other errors.
|
// Proceed if file does not exist. return for all other errors.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,56 +119,79 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca
|
||||||
if objectDir != "" {
|
if objectDir != "" {
|
||||||
// Create any missing top level directories.
|
// Create any missing top level directories.
|
||||||
if err := os.MkdirAll(objectDir, 0700); err != nil {
|
if err := os.MkdirAll(objectDir, 0700); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectExists verifies if object exists and you have permission to access.
|
// Get Object attributes.
|
||||||
// Check if the object exists and if you have permission to access it.
|
objAttr, err := c.Client.Bucket(bucketName).Object(objectName).Attrs(ctx)
|
||||||
exists, err := c.ObjectExists(ctx, bucketName, objectName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
return ErrorObjectDoesNotExist
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare target file.
|
||||||
objectFile, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY, 0600)
|
objectFile, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get Object from GCP Bucket
|
// Get Object data.
|
||||||
objectReader, err := c.Client.Bucket(bucketName).Object(objectName).NewReader(ctx)
|
objectReader, err := c.Client.Bucket(bucketName).Object(objectName).If(gcpstorage.Conditions{
|
||||||
|
GenerationMatch: objAttr.Generation,
|
||||||
|
}).NewReader(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
defer objectReader.Close()
|
defer func() {
|
||||||
|
if err = objectReader.Close(); err != nil {
|
||||||
|
ctrl.LoggerFrom(ctx).Error(err, "failed to close object reader")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Write Object to file.
|
// Write Object to file.
|
||||||
if _, err := io.Copy(objectFile, objectReader); err != nil {
|
if _, err := io.Copy(objectFile, objectReader); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the file.
|
// Close the file.
|
||||||
if err := objectFile.Close(); err != nil {
|
if err := objectFile.Close(); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return objAttr.Etag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// VisitObjects iterates over the items in the provided object storage
|
||||||
|
// bucket, calling visit for every item.
|
||||||
|
// If the underlying client or the visit callback returns an error,
|
||||||
|
// it returns early.
|
||||||
|
func (c *GCSClient) VisitObjects(ctx context.Context, bucketName string, visit func(path, etag string) error) error {
|
||||||
|
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
|
||||||
|
for {
|
||||||
|
object, err := items.Next()
|
||||||
|
if err == IteratorDone {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = visit(object.Name, object.Etag); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjects lists the objects/contents of the bucket whose bucket name is provided.
|
// Close closes the GCP Client and logs any useful errors.
|
||||||
// the objects are returned as an Objectiterator and .Next() has to be called on them
|
func (c *GCSClient) Close(ctx context.Context) {
|
||||||
// to loop through the Objects.
|
log := logr.FromContextOrDiscard(ctx)
|
||||||
func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator {
|
if err := c.Client.Close(); err != nil {
|
||||||
items := c.Client.Bucket(bucketName).Objects(ctx, query)
|
log.Error(err, "closing GCP client")
|
||||||
return items
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the GCP Client and logs any useful errors
|
// ObjectIsNotFound checks if the error provided is storage.ErrObjectNotExist.
|
||||||
func (c *GCPClient) Close(log logr.Logger) {
|
func (c *GCSClient) ObjectIsNotFound(err error) bool {
|
||||||
if err := c.Client.Close(); err != nil {
|
return errors.Is(err, gcpstorage.ErrObjectNotExist)
|
||||||
log.Error(err, "GCP Provider")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package gcp_test
|
package gcp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
@ -32,10 +32,11 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
gcpstorage "cloud.google.com/go/storage"
|
gcpstorage "cloud.google.com/go/storage"
|
||||||
"github.com/fluxcd/source-controller/pkg/gcp"
|
|
||||||
"google.golang.org/api/googleapi"
|
"google.golang.org/api/googleapi"
|
||||||
raw "google.golang.org/api/storage/v1"
|
raw "google.golang.org/api/storage/v1"
|
||||||
"gotest.tools/assert"
|
"gotest.tools/assert"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
"google.golang.org/api/option"
|
"google.golang.org/api/option"
|
||||||
)
|
)
|
||||||
|
|
@ -43,6 +44,8 @@ import (
|
||||||
const (
|
const (
|
||||||
bucketName string = "test-bucket"
|
bucketName string = "test-bucket"
|
||||||
objectName string = "test.yaml"
|
objectName string = "test.yaml"
|
||||||
|
objectGeneration int64 = 3
|
||||||
|
objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk="
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -50,12 +53,34 @@ var (
|
||||||
client *gcpstorage.Client
|
client *gcpstorage.Client
|
||||||
close func()
|
close func()
|
||||||
err error
|
err error
|
||||||
|
secret = corev1.Secret{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "gcp-secret",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Data: map[string][]byte{
|
||||||
|
"serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="),
|
||||||
|
},
|
||||||
|
Type: "Opaque",
|
||||||
|
}
|
||||||
|
badSecret = corev1.Secret{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "gcp-secret",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Data: map[string][]byte{
|
||||||
|
"username": []byte("test-user"),
|
||||||
|
},
|
||||||
|
Type: "Opaque",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
|
hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
|
||||||
io.Copy(io.Discard, r.Body)
|
io.Copy(io.Discard, r.Body)
|
||||||
if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName) {
|
|
||||||
|
switch r.RequestURI {
|
||||||
|
case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName):
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
response := getBucket()
|
response := getBucket()
|
||||||
jsonResponse, err := json.Marshal(response)
|
jsonResponse, err := json.Marshal(response)
|
||||||
|
|
@ -66,7 +91,7 @@ func TestMain(m *testing.M) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error writing jsonResponse %v\n", err)
|
log.Fatalf("error writing jsonResponse %v\n", err)
|
||||||
}
|
}
|
||||||
} else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) {
|
case fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName):
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
response := getObject()
|
response := getObject()
|
||||||
jsonResponse, err := json.Marshal(response)
|
jsonResponse, err := json.Marshal(response)
|
||||||
|
|
@ -77,9 +102,10 @@ func TestMain(m *testing.M) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error writing jsonResponse %v\n", err)
|
log.Fatalf("error writing jsonResponse %v\n", err)
|
||||||
}
|
}
|
||||||
} else if r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName) {
|
case fmt.Sprintf("/storage/v1/b/%s/o?alt=json&delimiter=&endOffset=&pageToken=&prefix=&prettyPrint=false&projection=full&startOffset=&versions=false", bucketName):
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
response := getObject()
|
response := &raw.Objects{}
|
||||||
|
response.Items = append(response.Items, getObject())
|
||||||
jsonResponse, err := json.Marshal(response)
|
jsonResponse, err := json.Marshal(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error marshalling response %v\n", err)
|
log.Fatalf("error marshalling response %v\n", err)
|
||||||
|
|
@ -88,14 +114,16 @@ func TestMain(m *testing.M) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error writing jsonResponse %v\n", err)
|
log.Fatalf("error writing jsonResponse %v\n", err)
|
||||||
}
|
}
|
||||||
} else if r.RequestURI == fmt.Sprintf("/%s/test.yaml", bucketName) || r.RequestURI == fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName) {
|
case fmt.Sprintf("/%s/test.yaml", bucketName),
|
||||||
|
fmt.Sprintf("/%s/test.yaml?ifGenerationMatch=%d", bucketName, objectGeneration),
|
||||||
|
fmt.Sprintf("/storage/v1/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName):
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
response := getObjectFile()
|
response := getObjectFile()
|
||||||
_, err = w.Write([]byte(response))
|
_, err = w.Write([]byte(response))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error writing response %v\n", err)
|
log.Fatalf("error writing response %v\n", err)
|
||||||
}
|
}
|
||||||
} else {
|
default:
|
||||||
w.WriteHeader(404)
|
w.WriteHeader(404)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
@ -109,14 +137,15 @@ func TestMain(m *testing.M) {
|
||||||
os.Exit(run)
|
os.Exit(run)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewClient(t *testing.T) {
|
func TestNewClientWithSecretErr(t *testing.T) {
|
||||||
gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc))
|
gcpClient, err := NewClient(context.Background(), secret.DeepCopy())
|
||||||
assert.NilError(t, err)
|
t.Log(err)
|
||||||
assert.Assert(t, gcpClient != nil)
|
assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value")
|
||||||
|
assert.Assert(t, gcpClient == nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBucketExists(t *testing.T) {
|
func TestBucketExists(t *testing.T) {
|
||||||
gcpClient := &gcp.GCPClient{
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
exists, err := gcpClient.BucketExists(context.Background(), bucketName)
|
exists, err := gcpClient.BucketExists(context.Background(), bucketName)
|
||||||
|
|
@ -126,7 +155,7 @@ func TestBucketExists(t *testing.T) {
|
||||||
|
|
||||||
func TestBucketNotExists(t *testing.T) {
|
func TestBucketNotExists(t *testing.T) {
|
||||||
bucket := "notexistsbucket"
|
bucket := "notexistsbucket"
|
||||||
gcpClient := &gcp.GCPClient{
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
exists, err := gcpClient.BucketExists(context.Background(), bucket)
|
exists, err := gcpClient.BucketExists(context.Background(), bucket)
|
||||||
|
|
@ -134,55 +163,57 @@ func TestBucketNotExists(t *testing.T) {
|
||||||
assert.Assert(t, !exists)
|
assert.Assert(t, !exists)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestObjectExists(t *testing.T) {
|
func TestVisitObjects(t *testing.T) {
|
||||||
gcpClient := &gcp.GCPClient{
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
exists, err := gcpClient.ObjectExists(context.Background(), bucketName, objectName)
|
keys := []string{}
|
||||||
if err == gcpstorage.ErrObjectNotExist {
|
etags := []string{}
|
||||||
|
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
|
||||||
|
keys = append(keys, key)
|
||||||
|
etags = append(etags, etag)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
}
|
assert.DeepEqual(t, keys, []string{objectName})
|
||||||
assert.NilError(t, err)
|
assert.DeepEqual(t, etags, []string{objectEtag})
|
||||||
assert.Assert(t, exists)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestObjectNotExists(t *testing.T) {
|
func TestVisitObjectsErr(t *testing.T) {
|
||||||
object := "doesnotexists.yaml"
|
gcpClient := &GCSClient{
|
||||||
gcpClient := &gcp.GCPClient{
|
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
exists, err := gcpClient.ObjectExists(context.Background(), bucketName, object)
|
badBucketName := "bad-bucket"
|
||||||
assert.Error(t, err, gcpstorage.ErrObjectNotExist.Error())
|
err := gcpClient.VisitObjects(context.Background(), badBucketName, func(key, etag string) error {
|
||||||
assert.Assert(t, !exists)
|
return nil
|
||||||
|
})
|
||||||
|
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListObjects(t *testing.T) {
|
func TestVisitObjectsCallbackErr(t *testing.T) {
|
||||||
gcpClient := &gcp.GCPClient{
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil)
|
mockErr := fmt.Errorf("mock")
|
||||||
for {
|
err := gcpClient.VisitObjects(context.Background(), bucketName, func(key, etag string) error {
|
||||||
_, err := objectIterator.Next()
|
return mockErr
|
||||||
if err == gcp.IteratorDone {
|
})
|
||||||
break
|
assert.Error(t, err, mockErr.Error())
|
||||||
}
|
|
||||||
assert.NilError(t, err)
|
|
||||||
}
|
|
||||||
assert.Assert(t, objectIterator != nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFGetObject(t *testing.T) {
|
func TestFGetObject(t *testing.T) {
|
||||||
tempDir, err := os.MkdirTemp("", bucketName)
|
tempDir, err := os.MkdirTemp("", bucketName)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
defer os.RemoveAll(tempDir)
|
defer os.RemoveAll(tempDir)
|
||||||
gcpClient := &gcp.GCPClient{
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
localPath := filepath.Join(tempDir, objectName)
|
localPath := filepath.Join(tempDir, objectName)
|
||||||
err = gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath)
|
etag, err := gcpClient.FGetObject(context.Background(), bucketName, objectName, localPath)
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
}
|
}
|
||||||
|
assert.Equal(t, etag, objectEtag)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFGetObjectNotExists(t *testing.T) {
|
func TestFGetObjectNotExists(t *testing.T) {
|
||||||
|
|
@ -190,24 +221,25 @@ func TestFGetObjectNotExists(t *testing.T) {
|
||||||
tempDir, err := os.MkdirTemp("", bucketName)
|
tempDir, err := os.MkdirTemp("", bucketName)
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
defer os.RemoveAll(tempDir)
|
defer os.RemoveAll(tempDir)
|
||||||
gcpClient := &gcp.GCPClient{
|
gcsClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
localPath := filepath.Join(tempDir, object)
|
localPath := filepath.Join(tempDir, object)
|
||||||
err = gcpClient.FGetObject(context.Background(), bucketName, object, localPath)
|
_, err = gcsClient.FGetObject(context.Background(), bucketName, object, localPath)
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
assert.Error(t, err, "storage: object doesn't exist")
|
assert.Error(t, err, "storage: object doesn't exist")
|
||||||
|
assert.Check(t, gcsClient.ObjectIsNotFound(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFGetObjectDirectoryIsFileName(t *testing.T) {
|
func TestFGetObjectDirectoryIsFileName(t *testing.T) {
|
||||||
tempDir, err := os.MkdirTemp("", bucketName)
|
tempDir, err := os.MkdirTemp("", bucketName)
|
||||||
defer os.RemoveAll(tempDir)
|
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
gcpClient := &gcp.GCPClient{
|
defer os.RemoveAll(tempDir)
|
||||||
|
gcpClient := &GCSClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
}
|
}
|
||||||
err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir)
|
_, err = gcpClient.FGetObject(context.Background(), bucketName, objectName, tempDir)
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
assert.Error(t, err, "filename is a directory")
|
assert.Error(t, err, "filename is a directory")
|
||||||
}
|
}
|
||||||
|
|
@ -216,35 +248,27 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) {
|
||||||
func TestValidateSecret(t *testing.T) {
|
func TestValidateSecret(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
title string
|
|
||||||
secret map[string][]byte
|
|
||||||
name string
|
name string
|
||||||
|
secret *corev1.Secret
|
||||||
error bool
|
error bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
"Test Case 1",
|
name: "valid secret",
|
||||||
map[string][]byte{
|
secret: secret.DeepCopy(),
|
||||||
"serviceaccount": []byte("serviceaccount"),
|
|
||||||
},
|
|
||||||
"Service Account",
|
|
||||||
false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"Test Case 2",
|
name: "invalid secret",
|
||||||
map[string][]byte{
|
secret: badSecret.DeepCopy(),
|
||||||
"data": []byte("data"),
|
error: true,
|
||||||
},
|
|
||||||
"Service Account",
|
|
||||||
true,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
testCase := testCase
|
tt := testCase
|
||||||
t.Run(testCase.title, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
err := gcp.ValidateSecret(testCase.secret, testCase.name)
|
err := ValidateSecret(tt.secret)
|
||||||
if testCase.error {
|
if tt.error {
|
||||||
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", testCase.name))
|
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'serviceaccount'", tt.secret.Name))
|
||||||
} else {
|
} else {
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
}
|
}
|
||||||
|
|
@ -280,7 +304,10 @@ func getObject() *raw.Object {
|
||||||
ContentLanguage: "en-us",
|
ContentLanguage: "en-us",
|
||||||
Size: 1 << 20,
|
Size: 1 << 20,
|
||||||
CustomTime: customTime.Format(time.RFC3339),
|
CustomTime: customTime.Format(time.RFC3339),
|
||||||
Md5Hash: "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=",
|
Generation: objectGeneration,
|
||||||
|
Metageneration: 3,
|
||||||
|
Etag: objectEtag,
|
||||||
|
Md5Hash: objectEtag,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package minio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/minio/minio-go/v7"
|
||||||
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
|
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
|
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MinioClient is a minimal Minio client for fetching files from S3 compatible
|
||||||
|
// storage APIs.
|
||||||
|
type MinioClient struct {
|
||||||
|
*minio.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates a new Minio storage client.
|
||||||
|
func NewClient(bucket *sourcev1.Bucket, secret *corev1.Secret) (*MinioClient, error) {
|
||||||
|
opt := minio.Options{
|
||||||
|
Region: bucket.Spec.Region,
|
||||||
|
Secure: !bucket.Spec.Insecure,
|
||||||
|
BucketLookup: minio.BucketLookupPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
if secret != nil {
|
||||||
|
var accessKey, secretKey string
|
||||||
|
if k, ok := secret.Data["accesskey"]; ok {
|
||||||
|
accessKey = string(k)
|
||||||
|
}
|
||||||
|
if k, ok := secret.Data["secretkey"]; ok {
|
||||||
|
secretKey = string(k)
|
||||||
|
}
|
||||||
|
if accessKey != "" && secretKey != "" {
|
||||||
|
opt.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
|
||||||
|
}
|
||||||
|
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
|
||||||
|
opt.Creds = credentials.NewIAM("")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := minio.New(bucket.Spec.Endpoint, &opt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &MinioClient{Client: client}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateSecret validates the credential secret. The provided Secret may
|
||||||
|
// be nil.
|
||||||
|
func ValidateSecret(secret *corev1.Secret) error {
|
||||||
|
if secret == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
|
||||||
|
if _, ok := secret.Data["accesskey"]; !ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, ok := secret.Data["secretkey"]; !ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FGetObject gets the object from the provided object storage bucket, and
|
||||||
|
// writes it to targetPath.
|
||||||
|
// It returns the etag of the successfully fetched file, or any error.
|
||||||
|
func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) (string, error) {
|
||||||
|
stat, err := c.Client.StatObject(ctx, bucketName, objectName, minio.GetObjectOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
opts := minio.GetObjectOptions{}
|
||||||
|
if err = opts.SetMatchETag(stat.ETag); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if err = c.Client.FGetObject(ctx, bucketName, objectName, localPath, opts); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return stat.ETag, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// VisitObjects iterates over the items in the provided object storage
|
||||||
|
// bucket, calling visit for every item.
|
||||||
|
// If the underlying client or the visit callback returns an error,
|
||||||
|
// it returns early.
|
||||||
|
func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(key, etag string) error) error {
|
||||||
|
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
|
||||||
|
Recursive: true,
|
||||||
|
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
|
||||||
|
}) {
|
||||||
|
if object.Err != nil {
|
||||||
|
err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := visit(object.Key, object.ETag); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectIsNotFound checks if the error provided is a minio.ErrResponse
|
||||||
|
// with "NoSuchKey" code.
|
||||||
|
func (c *MinioClient) ObjectIsNotFound(err error) bool {
|
||||||
|
if resp := new(minio.ErrorResponse); errors.As(err, resp) {
|
||||||
|
return resp.Code == "NoSuchKey"
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the Minio Client and logs any useful errors.
|
||||||
|
func (c *MinioClient) Close(_ context.Context) {
|
||||||
|
// Minio client does not provide a close method
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,283 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package minio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/fluxcd/pkg/apis/meta"
|
||||||
|
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||||
|
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
miniov7 "github.com/minio/minio-go/v7"
|
||||||
|
"gotest.tools/assert"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
objectName string = "test.yaml"
|
||||||
|
objectEtag string = "2020beab5f1711919157756379622d1d"
|
||||||
|
region string = "us-east-1"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
minioClient *MinioClient
|
||||||
|
bucketName = "test-bucket-minio" + uuid.New().String()
|
||||||
|
secret = corev1.Secret{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "minio-secret",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Data: map[string][]byte{
|
||||||
|
"accesskey": []byte("Q3AM3UQ867SPQQA43P2F"),
|
||||||
|
"secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"),
|
||||||
|
},
|
||||||
|
Type: "Opaque",
|
||||||
|
}
|
||||||
|
emptySecret = corev1.Secret{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "minio-secret",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Data: map[string][]byte{},
|
||||||
|
Type: "Opaque",
|
||||||
|
}
|
||||||
|
bucket = sourcev1.Bucket{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "minio-test-bucket",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Spec: sourcev1.BucketSpec{
|
||||||
|
BucketName: bucketName,
|
||||||
|
Endpoint: "play.min.io",
|
||||||
|
Region: region,
|
||||||
|
Provider: "generic",
|
||||||
|
Insecure: true,
|
||||||
|
SecretRef: &meta.LocalObjectReference{
|
||||||
|
Name: secret.Name,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
bucketAwsProvider = sourcev1.Bucket{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "minio-test-bucket",
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Spec: sourcev1.BucketSpec{
|
||||||
|
BucketName: bucketName,
|
||||||
|
Endpoint: "play.min.io",
|
||||||
|
Region: region,
|
||||||
|
Provider: "aws",
|
||||||
|
Insecure: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
var err error
|
||||||
|
ctx := context.Background()
|
||||||
|
minioClient, err = NewClient(bucket.DeepCopy(), secret.DeepCopy())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
createBucket(ctx)
|
||||||
|
addObjectToBucket(ctx)
|
||||||
|
run := m.Run()
|
||||||
|
removeObjectFromBucket(ctx)
|
||||||
|
deleteBucket(ctx)
|
||||||
|
os.Exit(run)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewClient(t *testing.T) {
|
||||||
|
minioClient, err := NewClient(bucket.DeepCopy(), secret.DeepCopy())
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, minioClient != nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewClientEmptySecret(t *testing.T) {
|
||||||
|
minioClient, err := NewClient(bucket.DeepCopy(), emptySecret.DeepCopy())
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, minioClient != nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewClientAwsProvider(t *testing.T) {
|
||||||
|
minioClient, err := NewClient(bucketAwsProvider.DeepCopy(), nil)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, minioClient != nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBucketExists(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
exists, err := minioClient.BucketExists(ctx, bucketName)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, exists)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBucketNotExists(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
exists, err := minioClient.BucketExists(ctx, "notexistsbucket")
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Assert(t, !exists)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFGetObject(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tempDir, err := os.MkdirTemp("", bucketName)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
defer os.RemoveAll(tempDir)
|
||||||
|
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
|
||||||
|
_, err = minioClient.FGetObject(ctx, bucketName, objectName, path)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFGetObjectNotExists(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tempDir, err := os.MkdirTemp("", bucketName)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
defer os.RemoveAll(tempDir)
|
||||||
|
badKey := "invalid.txt"
|
||||||
|
path := filepath.Join(tempDir, badKey)
|
||||||
|
_, err = minioClient.FGetObject(ctx, bucketName, badKey, path)
|
||||||
|
assert.Error(t, err, "The specified key does not exist.")
|
||||||
|
assert.Check(t, minioClient.ObjectIsNotFound(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVisitObjects(t *testing.T) {
|
||||||
|
keys := []string{}
|
||||||
|
etags := []string{}
|
||||||
|
err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
|
||||||
|
keys = append(keys, key)
|
||||||
|
etags = append(etags, etag)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.DeepEqual(t, keys, []string{objectName})
|
||||||
|
assert.DeepEqual(t, etags, []string{objectEtag})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVisitObjectsErr(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
badBucketName := "bad-bucket"
|
||||||
|
err := minioClient.VisitObjects(ctx, badBucketName, func(string, string) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVisitObjectsCallbackErr(t *testing.T) {
|
||||||
|
mockErr := fmt.Errorf("mock")
|
||||||
|
err := minioClient.VisitObjects(context.TODO(), bucketName, func(key, etag string) error {
|
||||||
|
return mockErr
|
||||||
|
})
|
||||||
|
assert.Error(t, err, mockErr.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSecret(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
secret *corev1.Secret
|
||||||
|
error bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "valid secret",
|
||||||
|
secret: secret.DeepCopy(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "nil secret",
|
||||||
|
secret: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid secret",
|
||||||
|
secret: emptySecret.DeepCopy(),
|
||||||
|
error: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
tt := testCase
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
err := ValidateSecret(tt.secret)
|
||||||
|
if tt.error {
|
||||||
|
assert.Error(t, err, fmt.Sprintf("invalid '%v' secret data: required fields 'accesskey' and 'secretkey'", tt.secret.Name))
|
||||||
|
} else {
|
||||||
|
assert.NilError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createBucket(ctx context.Context) {
|
||||||
|
if err := minioClient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil {
|
||||||
|
exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)
|
||||||
|
if errBucketExists == nil && exists {
|
||||||
|
deleteBucket(ctx)
|
||||||
|
} else {
|
||||||
|
log.Fatalln(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteBucket(ctx context.Context) {
|
||||||
|
if err := minioClient.Client.RemoveBucket(ctx, bucketName); err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addObjectToBucket(ctx context.Context) {
|
||||||
|
fileReader := strings.NewReader(getObjectFile())
|
||||||
|
fileSize := fileReader.Size()
|
||||||
|
_, err := minioClient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{
|
||||||
|
ContentType: "text/x-yaml",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeObjectFromBucket(ctx context.Context) {
|
||||||
|
if err := minioClient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{
|
||||||
|
GovernanceBypass: true,
|
||||||
|
}); err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObjectFile() string {
|
||||||
|
return `
|
||||||
|
apiVersion: source.toolkit.fluxcd.io/v1beta2
|
||||||
|
kind: Bucket
|
||||||
|
metadata:
|
||||||
|
name: podinfo
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
interval: 5m
|
||||||
|
provider: aws
|
||||||
|
bucketName: podinfo
|
||||||
|
endpoint: s3.amazonaws.com
|
||||||
|
region: us-east-1
|
||||||
|
timeout: 30s
|
||||||
|
`
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue