diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index bf3e3f51..d4a815ff 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -48,7 +48,6 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" ) // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete @@ -74,8 +73,8 @@ type BucketProvider interface { BucketExists(context.Context, string) (bool, error) ObjectExists(context.Context, string, string) (bool, error) FGetObject(context.Context, string, string, string) error - ListObjects(context.Context, gitignore.Matcher, string, string) error ObjectIsNotFound(error) bool + VisitObjects(context.Context, string, func(string) error) error Close(context.Context) } @@ -303,13 +302,13 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck } // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { + ignorefile := filepath.Join(tempDir, sourceignore.IgnoreFile) + if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, ignorefile); err != nil { + if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { // FIXME? return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } - ps, err := sourceignore.ReadIgnoreFile(path, nil) + ps, err := sourceignore.ReadIgnoreFile(ignorefile, nil) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } @@ -318,12 +317,29 @@ func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Buck ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir) + + err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { + if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { + return nil + } + + if matcher.Match(strings.Split(path, "/"), false) { + return nil + } + + localPath := filepath.Join(tempDir, path) + err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) + if err != nil { + err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + return err + } + return nil + }) if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) + err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } - return sourcev1.Bucket{}, nil + return bucket, nil } func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 73cf8c2d..c0fe715e 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -22,15 +22,10 @@ import ( "io" "os" "path/filepath" - "strings" gcpstorage "cloud.google.com/go/storage" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" - "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/go-logr/logr" - - "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" @@ -178,10 +173,8 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca // ListObjects lists the objects/contents of the bucket whose bucket name is provided. // the objects are returned as an Objectiterator and .Next() has to be called on them // to loop through the Objects. The Object are downloaded using a goroutine. -func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { - log := logr.FromContext(ctx) +func (c *GCPClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { items := c.Client.Bucket(bucketName).Objects(ctx, nil) - g, ctxx := errgroup.WithContext(ctx) for { object, err := items.Next() if err == IteratorDone { @@ -191,19 +184,10 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) return err } - if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) { - g.Go(func() error { - if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil { - log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) - return err - } - return nil - }) + if err = visit(object.Name); err != nil { + return err } } - if err := g.Wait(); err != nil { - return err - } return nil } @@ -219,12 +203,3 @@ func (c *GCPClient) Close(ctx context.Context) { func (c *GCPClient) ObjectIsNotFound(err error) bool { return errors.Is(err, ErrorObjectDoesNotExist) } - -// DownloadObject gets an object and downloads the object locally. -func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error { - localPath := filepath.Join(tempDir, obj.Name) - if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil { - return err - } - return nil -} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 70d2d57b..12b1223e 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -36,7 +36,6 @@ import ( "github.com/fluxcd/pkg/apis/meta" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/gcp" - "github.com/fluxcd/source-controller/pkg/sourceignore" "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" @@ -174,7 +173,6 @@ func TestNewClientWithSecretErr(t *testing.T) { func TestNewClientWithoutSecretErr(t *testing.T) { gcpClient, err := gcp.NewClient(context.Background(), badSecret, bucketNoSecretRef) - t.Log(err) assert.NilError(t, err) assert.Assert(t, gcpClient != nil) } @@ -220,36 +218,28 @@ func TestObjectNotExists(t *testing.T) { assert.Assert(t, !exists) } -func TestListObjects(t *testing.T) { +func TestVisitObjects(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } - tempDir, err := os.MkdirTemp("", bucketName) - defer os.RemoveAll(tempDir) - assert.NilError(t, err) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir) + objs := []string{} + err := gcpClient.VisitObjects(context.Background(), bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{}) } -func TestListObjectsErr(t *testing.T) { +func TestVisitObjectsErr(t *testing.T) { gcpClient := &gcp.GCPClient{ Client: client, } badBucketName := "bad-bucket" - tempDir, err := os.MkdirTemp("", badBucketName) - defer os.RemoveAll(tempDir) - assert.NilError(t, err) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - - err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir) + err := gcpClient.VisitObjects(context.Background(), badBucketName, func(path string) error { + return nil + }) assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName)) } @@ -295,46 +285,6 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) { } } -func TestDownloadObject(t *testing.T) { - gcpClient := &gcp.GCPClient{ - Client: client, - } - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ - Bucket: bucketName, - Name: objectName, - ContentType: "text/x-yaml", - Size: 1 << 20, - }, matcher, bucketName, tempDir) - assert.NilError(t, err) -} - -func TestDownloadObjectErr(t *testing.T) { - gcpClient := &gcp.GCPClient{ - Client: client, - } - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ - Bucket: bucketName, - Name: "test1.yaml", - ContentType: "text/x-yaml", - Size: 1 << 20, - }, matcher, bucketName, tempDir) - assert.Error(t, err, "storage: object doesn't exist") -} - func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index 64b11cce..df4ab64e 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -19,12 +19,8 @@ package minio import ( "context" "fmt" - "path/filepath" - "strings" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" - "github.com/fluxcd/source-controller/pkg/sourceignore" - "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/s3utils" @@ -88,7 +84,7 @@ func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, lo } // ListObjects lists all the objects in a bucket and downloads the objects. -func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { +func (c *MinioClient) VisitObjects(ctx context.Context, bucketName string, visit func(string) error) error { for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ Recursive: true, UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()), @@ -98,18 +94,7 @@ func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher return err } - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - localPath := filepath.Join(tempDir, object.Key) - err := c.FGetObject(ctx, bucketName, object.Key, localPath) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err) + if err := visit(object.Key); err != nil { return err } } diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go index ae2c2f65..43622ee3 100644 --- a/pkg/minio/minio_test.go +++ b/pkg/minio/minio_test.go @@ -188,30 +188,23 @@ func TestFGetObject(t *testing.T) { assert.NilError(t, err) } -func TestListObjects(t *testing.T) { +func TestVisitObjects(t *testing.T) { ctx := context.Background() - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir) + objs := []string{} + err := minioclient.VisitObjects(ctx, bucketName, func(path string) error { + objs = append(objs, path) + return nil + }) assert.NilError(t, err) + assert.DeepEqual(t, objs, []string{}) } -func TestListObjectsErr(t *testing.T) { +func TestVisitObjectsErr(t *testing.T) { ctx := context.Background() badBucketName := "bad-bucket" - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir) + err := minioclient.VisitObjects(ctx, badBucketName, func(string) error { + return nil + }) assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName)) }