From db1e8dbbc8711ff967bc77c4c7b27c30a1f8f719 Mon Sep 17 00:00:00 2001 From: pa250194 Date: Tue, 2 Nov 2021 15:06:32 -0500 Subject: [PATCH] Added errgroup withcontext General cleanup and function renaming Signed-off-by: pa250194 Fix: errgroup provides waitgroup within the Go function Signed-off-by: pa250194 --- controllers/bucket_controller.go | 44 ++++++++++++++++++++------------ pkg/gcp/gcp.go | 36 +++++++++++++++----------- pkg/gcp/gcp_test.go | 20 --------------- pkg/minio/minio.go | 11 ++++---- pkg/minio/minio_test.go | 1 - 5 files changed, 55 insertions(+), 57 deletions(-) diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index fc953b3b..bf3e3f51 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -75,6 +75,7 @@ type BucketProvider interface { ObjectExists(context.Context, string, string) (bool, error) FGetObject(context.Context, string, string, string) error ListObjects(context.Context, gitignore.Matcher, string, string) error + ObjectIsNotFound(error) bool Close(context.Context) } @@ -183,23 +184,35 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil } +func (r *BucketReconciler) getBucketSecret(ctx context.Context, bucket sourcev1.Bucket) (corev1.Secret, error) { + var secret corev1.Secret + secretName := types.NamespacedName{ + Namespace: bucket.GetNamespace(), + Name: bucket.Spec.SecretRef.Name, + } + if err := r.Get(ctx, secretName, &secret); err != nil { + return secret, err + } + return secret, nil +} + func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { tempDir, err := os.MkdirTemp("", bucket.Name) if err != nil { err = fmt.Errorf("tmp dir error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - + defer os.RemoveAll(tempDir) var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) + + if bucket.Spec.SecretRef != nil { + secret, err = r.getBucketSecret(ctx, bucket) + if err != nil { + return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) + } } - if bucketResponse, err := registerBucketProviders(ctx, bucket, secret, tempDir); err != nil { + if bucketResponse, err := fetchBucketContents(ctx, bucket, secret, tempDir); err != nil { return bucketResponse, err } @@ -247,13 +260,12 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket } message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) - os.RemoveAll(tempDir) return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil } -// registerBucketProviders selects a bucket provider that implement the bucket provider interface based on +// fetchBucketContents selects a bucket provider that implement the bucket provider interface based on // on the specified provider in the bucket spec. -func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { +func fetchBucketContents(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { switch bucket.Spec.Provider { case sourcev1.GoogleBucketProvider: gcpClient, err := gcp.NewClient(ctx, secret, bucket) @@ -261,7 +273,8 @@ func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret err = fmt.Errorf("auth error: %w", err) return sourcev1.Bucket{}, err } - if bucketResponse, err := reconcileAll(ctx, gcpClient, bucket, tempDir); err != nil { + defer gcpClient.Close(ctx) + if bucketResponse, err := fetchFiles(ctx, gcpClient, bucket, tempDir); err != nil { return bucketResponse, err } default: @@ -270,17 +283,16 @@ func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret err = fmt.Errorf("auth error: %w", err) return sourcev1.Bucket{}, err } - if bucketResponse, err := reconcileAll(ctx, minioClient, bucket, tempDir); err != nil { + if bucketResponse, err := fetchFiles(ctx, minioClient, bucket, tempDir); err != nil { return bucketResponse, err } } return sourcev1.Bucket{}, nil } -func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { +func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) defer cancel() - defer client.Close(ctx) exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err @@ -293,7 +305,7 @@ func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bu // Look for file with ignore rules first. path := filepath.Join(tempDir, sourceignore.IgnoreFile) if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { + if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index bcf34219..73cf8c2d 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - package gcp import ( @@ -24,13 +23,14 @@ import ( "os" "path/filepath" "strings" - "sync" gcpstorage "cloud.google.com/go/storage" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" "github.com/go-git/go-git/v5/plumbing/format/gitignore" "github.com/go-logr/logr" + + "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" @@ -181,7 +181,7 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error { log := logr.FromContext(ctx) items := c.Client.Bucket(bucketName).Objects(ctx, nil) - var wg sync.WaitGroup + g, ctxx := errgroup.WithContext(ctx) for { object, err := items.Next() if err == IteratorDone { @@ -191,15 +191,19 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err) return err } - wg.Add(1) - go func() { - defer wg.Done() - if err := DownloadObject(ctx, c, object, matcher, bucketName, tempDir); err != nil { - log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) - } - }() + if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) { + g.Go(func() error { + if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil { + log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName)) + return err + } + return nil + }) + } + } + if err := g.Wait(); err != nil { + return err } - wg.Wait() return nil } @@ -207,15 +211,17 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, func (c *GCPClient) Close(ctx context.Context) { log := logr.FromContext(ctx) if err := c.Client.Close(); err != nil { - log.Error(err, "GCP Provider") + log.Error(err, "closing GCP client") } } +// ObjectIsNotFound checks if the error provided is ErrorObjectDoesNotExist(object does not exist) +func (c *GCPClient) ObjectIsNotFound(err error) bool { + return errors.Is(err, ErrorObjectDoesNotExist) +} + // DownloadObject gets an object and downloads the object locally. func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error { - if strings.HasSuffix(obj.Name, "/") || obj.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(obj.Name, "/"), false) { - return nil - } localPath := filepath.Join(tempDir, obj.Name) if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil { return err diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index fac237db..70d2d57b 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -335,26 +335,6 @@ func TestDownloadObjectErr(t *testing.T) { assert.Error(t, err, "storage: object doesn't exist") } -func TestDownloadObjectSuffix(t *testing.T) { - gcpClient := &gcp.GCPClient{ - Client: client, - } - tempDir, err := os.MkdirTemp("", bucketName) - assert.NilError(t, err) - defer os.RemoveAll(tempDir) - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - ps, err := sourceignore.ReadIgnoreFile(path, nil) - assert.NilError(t, err) - matcher := sourceignore.NewMatcher(ps) - err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{ - Bucket: bucketName, - Name: "test1/", - ContentType: "text/x-yaml", - Size: 1 << 20, - }, matcher, bucketName, tempDir) - assert.NilError(t, err) -} - func TestValidateSecret(t *testing.T) { t.Parallel() testCases := []struct { diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index 4dd221da..64b11cce 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -73,11 +73,6 @@ func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket return &MinioClient{Client: client}, nil } -// BucketExists checks if the bucket with the provided name exists. -func (c *MinioClient) BucketExists(ctx context.Context, bucketName string) (bool, error) { - return c.Client.BucketExists(ctx, bucketName) -} - // ObjectExists checks if the object with the provided name exists. func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) { _, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{}) @@ -125,3 +120,9 @@ func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher func (c *MinioClient) Close(ctx context.Context) { //minio client does not provide a close method } + +// ObjectIsNotFound checks if the error provided is NoSuchKey(object does not exist) +func (c *MinioClient) ObjectIsNotFound(err error) bool { + resp, ok := err.(minio.ErrorResponse) + return ok && resp.Code != "NoSuchKey" +} diff --git a/pkg/minio/minio_test.go b/pkg/minio/minio_test.go index 6d814397..ae2c2f65 100644 --- a/pkg/minio/minio_test.go +++ b/pkg/minio/minio_test.go @@ -119,7 +119,6 @@ func TestMain(m *testing.M) { run := m.Run() removeObjectFromBucket(ctx) deleteBucket(ctx) - //minioclient.Client.Close os.Exit(run) }