Added errgroup withcontext

General cleanup and function renaming

Signed-off-by: pa250194 <pa250194@ncr.com>

Fix: errgroup provides waitgroup within the Go function

Signed-off-by: pa250194 <pa250194@ncr.com>
This commit is contained in:
pa250194 2021-11-02 15:06:32 -05:00
parent fb6024ed3d
commit db1e8dbbc8
5 changed files with 55 additions and 57 deletions

View File

@ -75,6 +75,7 @@ type BucketProvider interface {
ObjectExists(context.Context, string, string) (bool, error)
FGetObject(context.Context, string, string, string) error
ListObjects(context.Context, gitignore.Matcher, string, string) error
ObjectIsNotFound(error) bool
Close(context.Context)
}
@ -183,23 +184,35 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil
}
func (r *BucketReconciler) getBucketSecret(ctx context.Context, bucket sourcev1.Bucket) (corev1.Secret, error) {
var secret corev1.Secret
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}
if err := r.Get(ctx, secretName, &secret); err != nil {
return secret, err
}
return secret, nil
}
func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
tempDir, err := os.MkdirTemp("", bucket.Name)
if err != nil {
err = fmt.Errorf("tmp dir error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
}
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}
defer os.RemoveAll(tempDir)
var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err)
if bucket.Spec.SecretRef != nil {
secret, err = r.getBucketSecret(ctx, bucket)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err)
}
}
if bucketResponse, err := registerBucketProviders(ctx, bucket, secret, tempDir); err != nil {
if bucketResponse, err := fetchBucketContents(ctx, bucket, secret, tempDir); err != nil {
return bucketResponse, err
}
@ -247,13 +260,12 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket
}
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
os.RemoveAll(tempDir)
return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
}
// registerBucketProviders selects a bucket provider that implement the bucket provider interface based on
// fetchBucketContents selects a bucket provider that implement the bucket provider interface based on
// on the specified provider in the bucket spec.
func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) {
func fetchBucketContents(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) {
switch bucket.Spec.Provider {
case sourcev1.GoogleBucketProvider:
gcpClient, err := gcp.NewClient(ctx, secret, bucket)
@ -261,7 +273,8 @@ func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}
if bucketResponse, err := reconcileAll(ctx, gcpClient, bucket, tempDir); err != nil {
defer gcpClient.Close(ctx)
if bucketResponse, err := fetchFiles(ctx, gcpClient, bucket, tempDir); err != nil {
return bucketResponse, err
}
default:
@ -270,17 +283,16 @@ func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}
if bucketResponse, err := reconcileAll(ctx, minioClient, bucket, tempDir); err != nil {
if bucketResponse, err := fetchFiles(ctx, minioClient, bucket, tempDir); err != nil {
return bucketResponse, err
}
}
return sourcev1.Bucket{}, nil
}
func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()
defer client.Close(ctx)
exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
@ -293,7 +305,7 @@ func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bu
// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}

View File

@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcp
import (
@ -24,13 +23,14 @@ import (
"os"
"path/filepath"
"strings"
"sync"
gcpstorage "cloud.google.com/go/storage"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
@ -181,7 +181,7 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca
func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error {
log := logr.FromContext(ctx)
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
var wg sync.WaitGroup
g, ctxx := errgroup.WithContext(ctx)
for {
object, err := items.Next()
if err == IteratorDone {
@ -191,15 +191,19 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher,
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err)
return err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := DownloadObject(ctx, c, object, matcher, bucketName, tempDir); err != nil {
log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName))
}
}()
if !(strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(object.Name, "/"), false)) {
g.Go(func() error {
if err := DownloadObject(ctxx, c, object, matcher, bucketName, tempDir); err != nil {
log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName))
return err
}
return nil
})
}
}
if err := g.Wait(); err != nil {
return err
}
wg.Wait()
return nil
}
@ -207,15 +211,17 @@ func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher,
func (c *GCPClient) Close(ctx context.Context) {
log := logr.FromContext(ctx)
if err := c.Client.Close(); err != nil {
log.Error(err, "GCP Provider")
log.Error(err, "closing GCP client")
}
}
// ObjectIsNotFound checks if the error provided is ErrorObjectDoesNotExist(object does not exist)
func (c *GCPClient) ObjectIsNotFound(err error) bool {
return errors.Is(err, ErrorObjectDoesNotExist)
}
// DownloadObject gets an object and downloads the object locally.
func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error {
if strings.HasSuffix(obj.Name, "/") || obj.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(obj.Name, "/"), false) {
return nil
}
localPath := filepath.Join(tempDir, obj.Name)
if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil {
return err

View File

@ -335,26 +335,6 @@ func TestDownloadObjectErr(t *testing.T) {
assert.Error(t, err, "storage: object doesn't exist")
}
func TestDownloadObjectSuffix(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: "test1/",
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestValidateSecret(t *testing.T) {
t.Parallel()
testCases := []struct {

View File

@ -73,11 +73,6 @@ func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket
return &MinioClient{Client: client}, nil
}
// BucketExists checks if the bucket with the provided name exists.
func (c *MinioClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
return c.Client.BucketExists(ctx, bucketName)
}
// ObjectExists checks if the object with the provided name exists.
func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) {
_, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
@ -125,3 +120,9 @@ func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher
func (c *MinioClient) Close(ctx context.Context) {
//minio client does not provide a close method
}
// ObjectIsNotFound checks if the error provided is NoSuchKey(object does not exist)
func (c *MinioClient) ObjectIsNotFound(err error) bool {
resp, ok := err.(minio.ErrorResponse)
return ok && resp.Code != "NoSuchKey"
}

View File

@ -119,7 +119,6 @@ func TestMain(m *testing.M) {
run := m.Run()
removeObjectFromBucket(ctx)
deleteBucket(ctx)
//minioclient.Client.Close
os.Exit(run)
}