Factor out fetching objects

The algorithm for conditionally downloading object files is the same,
whether you are using GCP storage or an S3/Minio-compatible
bucket. The only thing that differs is how the respective clients
handle enumerating through the objects in the bucket; by implementing
just that in each provider, I can have the select-and-fetch code in
once place.

This deliberately omits the parallelised fetching that the GCP client
had, for the sake of lining the clients up. It can be reintroduced (in
the factored out code) later.

Signed-off-by: Michael Bridgen <michael@weave.works>
This commit is contained in:
Michael Bridgen 2021-11-02 17:33:38 +00:00
parent 9b1850c908
commit d51bddc263
5 changed files with 52 additions and 133 deletions

View File

@ -48,7 +48,6 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
)
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@ -74,8 +73,8 @@ type BucketProvider interface {
BucketExists(context.Context, string) (bool, error)
ObjectExists(context.Context, string, string) (bool, error)
FGetObject(context.Context, string, string, string) error
ListObjects(context.Context, gitignore.Matcher, string, string) error
ObjectIsNotFound(error) bool
VisitObjects(context.Context, string, func(string) error) error
Close(context.Context)
}
@ -303,13 +302,13 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
}
// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" {
ignorefile := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, ignorefile); err != nil {
if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { // FIXME?
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
ps, err := sourceignore.ReadIgnoreFile(ignorefile, nil)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
@ -318,12 +317,29 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir)
err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error {
if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile {
return nil
}
if matcher.Match(strings.Split(path, "/"), false) {
return nil
}
localPath := filepath.Join(tempDir, path)
err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return err
}
return nil
})
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
return sourcev1.Bucket{}, nil
return bucket, nil
}
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {

View File

@ -22,15 +22,10 @@ import (
"io"
"os"
"path/filepath"
"strings"
gcpstorage "cloud.google.com/go/storage"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
@ -178,10 +173,8 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca
// ListObjects lists the objects/contents of the bucket whose bucket name is provided.
// the objects are returned as an Objectiterator and .Next() has to be called on them
// to loop through the Objects. The Object are downloaded using a goroutine.
func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error {
log := logr.FromContext(ctx)
func (c *GCPClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error {
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
g, ctxx := errgroup.WithContext(ctx)
for {
object, err := items.Next()
if err == IteratorDone {
@ -191,19 +184,10 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher,
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err)
return err
}
if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) {
g.Go(func() error {
if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil {
log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName))
return err
}
return nil
})
if err = visit(object.Name); err != nil {
return err
}
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
@ -219,12 +203,3 @@ func (c *GCPClient) Close(ctx context.Context) {
func (c *GCPClient) ObjectIsNotFound(err error) bool {
return errors.Is(err, ErrorObjectDoesNotExist)
}
// DownloadObject gets an object and downloads the object locally.
func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error {
localPath := filepath.Join(tempDir, obj.Name)
if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil {
return err
}
return nil
}

View File

@ -36,7 +36,6 @@ import (
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
"gotest.tools/assert"
@ -174,7 +173,6 @@ func TestNewClientWithSecretErr(t *testing.T) {
func TestNewClientWithoutSecretErr(t *testing.T) {
gcpClient, err := gcp.NewClient(context.Background(), badSecret, bucketNoSecretRef)
t.Log(err)
assert.NilError(t, err)
assert.Assert(t, gcpClient != nil)
}
@ -220,36 +218,28 @@ func TestObjectNotExists(t *testing.T) {
assert.Assert(t, !exists)
}
func TestListObjects(t *testing.T) {
func TestVisitObjects(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir)
objs := []string{}
err := gcpClient.VisitObjects(context.Background(), bucketName, func(path string) error {
objs = append(objs, path)
return nil
})
assert.NilError(t, err)
assert.DeepEqual(t, objs, []string{})
}
func TestListObjectsErr(t *testing.T) {
func TestVisitObjectsErr(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
badBucketName := "bad-bucket"
tempDir, err := os.MkdirTemp("", badBucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir)
err := gcpClient.VisitObjects(context.Background(), badBucketName, func(path string) error {
return nil
})
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
}
@ -295,46 +285,6 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) {
}
}
func TestDownloadObject(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: objectName,
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestDownloadObjectErr(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: "test1.yaml",
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.Error(t, err, "storage: object doesn't exist")
}
func TestValidateSecret(t *testing.T) {
t.Parallel()
testCases := []struct {

View File

@ -19,12 +19,8 @@ package minio
import (
"context"
"fmt"
"path/filepath"
"strings"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
@ -88,7 +84,7 @@ func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, lo
}
// ListObjects lists all the objects in a bucket and downloads the objects.
func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error {
func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error {
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
@ -98,18 +94,7 @@ func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher
return err
}
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
continue
}
if matcher.Match(strings.Split(object.Key, "/"), false) {
continue
}
localPath := filepath.Join(tempDir, object.Key)
err := c.FGetObject(ctx, bucketName, object.Key, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err)
if err := visit(object.Key); err != nil {
return err
}
}

View File

@ -188,30 +188,23 @@ func TestFGetObject(t *testing.T) {
assert.NilError(t, err)
}
func TestListObjects(t *testing.T) {
func TestVisitObjects(t *testing.T) {
ctx := context.Background()
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir)
objs := []string{}
err := minioclient.VisitObjects(ctx, bucketName, func(path string) error {
objs = append(objs, path)
return nil
})
assert.NilError(t, err)
assert.DeepEqual(t, objs, []string{})
}
func TestListObjectsErr(t *testing.T) {
func TestVisitObjectsErr(t *testing.T) {
ctx := context.Background()
badBucketName := "bad-bucket"
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir)
err := minioclient.VisitObjects(ctx, badBucketName, func(string) error {
return nil
})
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
}