Refactor Bucket Controller

Fix bug in bucket provider interface

Added Bucket Provider Interface

Signed-off-by: pa250194 <pa250194@ncr.com>

Fix context timeout defer issue

Signed-off-by: pa250194 <pa250194@ncr.com>

Fix GCP storage provider test

Signed-off-by: pa250194 <pa250194@ncr.com>
This commit is contained in:
pa250194 2021-10-19 12:13:37 -05:00
parent 79c19adf3f
commit 4105490d42
7 changed files with 701 additions and 248 deletions

View File

@ -26,10 +26,6 @@ import (
"time"
"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -48,9 +44,11 @@ import (
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/minio"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
)
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@ -72,6 +70,14 @@ type BucketReconcilerOptions struct {
MaxConcurrentReconciles int
}
type BucketProvider interface {
BucketExists(context.Context, string) (bool, error)
ObjectExists(context.Context, string, string) (bool, error)
FGetObject(context.Context, string, string, string) error
ListObjects(context.Context, gitignore.Matcher, string, string) error
Close(context.Context)
}
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
}
@ -178,25 +184,25 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
var err error
var sourceBucket sourcev1.Bucket
tempDir, err := os.MkdirTemp("", bucket.Name)
if err != nil {
err = fmt.Errorf("tmp dir error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
}
defer os.RemoveAll(tempDir)
if bucket.Spec.Provider == sourcev1.GoogleBucketProvider {
sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir)
if err != nil {
return sourceBucket, err
}
} else {
sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir)
if err != nil {
return sourceBucket, err
}
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}
var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err)
}
if bucketResponse, err := registerBucketProviders(ctx, bucket, secret, tempDir); err != nil {
return bucketResponse, err
}
revision, err := r.checksum(tempDir)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
@ -241,9 +247,73 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket
}
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
os.RemoveAll(tempDir)
return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
}
// registerBucketProviders selects a bucket provider that implement the bucket provider interface based on
// on the specified provider in the bucket spec.
func registerBucketProviders(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) {
switch bucket.Spec.Provider {
case sourcev1.GoogleBucketProvider:
gcpClient, err := gcp.NewClient(ctx, secret, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}
if bucketResponse, err := reconcileAll(ctx, gcpClient, bucket, tempDir); err != nil {
return bucketResponse, err
}
default:
minioClient, err := minio.NewClient(ctx, secret, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.Bucket{}, err
}
if bucketResponse, err := reconcileAll(ctx, minioClient, bucket, tempDir); err != nil {
return bucketResponse, err
}
}
return sourcev1.Bucket{}, nil
}
func reconcileAll(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()
defer client.Close(ctx)
exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if !exists {
err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// In-spec patterns take precedence
if bucket.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
err = client.ListObjects(ctxTimeout, matcher, bucket.Spec.BucketName, tempDir)
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
return sourcev1.Bucket{}, nil
}
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
if err := r.gc(bucket); err != nil {
r.event(ctx, bucket, events.EventSeverityError,
@ -265,216 +335,6 @@ func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.
return ctrl.Result{}, nil
}
// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket
// using a gcp client
func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
log := logr.FromContext(ctx)
gcpClient, err := r.authGCP(ctx, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
}
defer gcpClient.Close(log)
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()
exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if !exists {
err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// Look for file with ignore rules first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// In-spec patterns take precedence
if bucket.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil)
// download bucket content
for {
object, err := objects.Next()
if err == gcp.IteratorDone {
break
}
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
continue
}
if matcher.Match(strings.Split(object.Name, "/"), false) {
continue
}
localPath := filepath.Join(tempDir, object.Name)
if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
return sourcev1.Bucket{}, nil
}
// reconcileWithMinio handles getting objects from an S3 compatible bucket
// using a minio client
func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
s3Client, err := r.authMinio(ctx, bucket)
if err != nil {
err = fmt.Errorf("auth error: %w", err)
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
}
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
defer cancel()
exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if !exists {
err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// Look for file with ignore rules first
// NB: S3 has flat filepath keys making it impossible to look
// for files in "subdirectories" without building up a tree first.
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
// In-spec patterns take precedence
if bucket.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
// download bucket content
for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
}) {
if object.Err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
continue
}
if matcher.Match(strings.Split(object.Key, "/"), false) {
continue
}
localPath := filepath.Join(tempDir, object.Key)
err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
}
}
return sourcev1.Bucket{}, nil
}
// authGCP creates a new Google Cloud Platform storage client
// to interact with the storage service.
func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) {
var client *gcp.GCPClient
var err error
if bucket.Spec.SecretRef != nil {
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}
var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return nil, fmt.Errorf("credentials secret error: %w", err)
}
if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil {
return nil, err
}
client, err = gcp.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
if err != nil {
return nil, err
}
} else {
client, err = gcp.NewClient(ctx)
if err != nil {
return nil, err
}
}
return client, nil
}
// authMinio creates a new Minio client to interact with S3
// compatible storage services.
func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
opt := minio.Options{
Region: bucket.Spec.Region,
Secure: !bucket.Spec.Insecure,
}
if bucket.Spec.SecretRef != nil {
secretName := types.NamespacedName{
Namespace: bucket.GetNamespace(),
Name: bucket.Spec.SecretRef.Name,
}
var secret corev1.Secret
if err := r.Get(ctx, secretName, &secret); err != nil {
return nil, fmt.Errorf("credentials secret error: %w", err)
}
accesskey := ""
secretkey := ""
if k, ok := secret.Data["accesskey"]; ok {
accesskey = string(k)
}
if k, ok := secret.Data["secretkey"]; ok {
secretkey = string(k)
}
if accesskey == "" || secretkey == "" {
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
}
opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
opt.Creds = credentials.NewIAM("")
}
if opt.Creds == nil {
return nil, fmt.Errorf("no bucket credentials found")
}
return minio.New(bucket.Spec.Endpoint, &opt)
}
// checksum calculates the SHA1 checksum of the given root directory.
// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the
// list with relative file paths and their checksums.

1
go.mod
View File

@ -23,6 +23,7 @@ require (
github.com/go-git/go-git/v5 v5.4.2
github.com/go-logr/logr v0.4.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/libgit2/git2go/v31 v31.6.1
github.com/minio/minio-go/v7 v7.0.10

2
go.sum
View File

@ -487,6 +487,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0 h1:6DWmvNpomjL1+3liNSZbVns3zsYzzCjm6pRBO1tLeso=

View File

@ -23,11 +23,17 @@ import (
"io"
"os"
"path/filepath"
"strings"
"sync"
gcpstorage "cloud.google.com/go/storage"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/go-logr/logr"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
)
var (
@ -50,13 +56,26 @@ type GCPClient struct {
// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
// Credential environment variable or look for the Google Application Credential file.
func NewClient(ctx context.Context, opts ...option.ClientOption) (*GCPClient, error) {
client, err := gcpstorage.NewClient(ctx, opts...)
if err != nil {
return nil, err
func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*GCPClient, error) {
gcpclient := &GCPClient{}
if bucket.Spec.SecretRef != nil {
if err := ValidateSecret(secret.Data, secret.Name); err != nil {
return nil, err
}
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
if err != nil {
return nil, err
}
gcpclient.Client = client
} else {
client, err := gcpstorage.NewClient(ctx)
if err != nil {
return nil, err
}
gcpclient.Client = client
}
return &GCPClient{Client: client}, nil
return gcpclient, nil
}
// ValidateSecret validates the credential secrets
@ -158,15 +177,48 @@ func (c *GCPClient) FGetObject(ctx context.Context, bucketName, objectName, loca
// ListObjects lists the objects/contents of the bucket whose bucket name is provided.
// the objects are returned as an Objectiterator and .Next() has to be called on them
// to loop through the Objects.
func (c *GCPClient) ListObjects(ctx context.Context, bucketName string, query *gcpstorage.Query) *gcpstorage.ObjectIterator {
items := c.Client.Bucket(bucketName).Objects(ctx, query)
return items
// to loop through the Objects. The Object are downloaded using a goroutine.
func (c *GCPClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error {
log := logr.FromContext(ctx)
items := c.Client.Bucket(bucketName).Objects(ctx, nil)
var wg sync.WaitGroup
for {
object, err := items.Next()
if err == IteratorDone {
break
}
if err != nil {
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, err)
return err
}
wg.Add(1)
go func() {
defer wg.Done()
if err := DownloadObject(ctx, c, object, matcher, bucketName, tempDir); err != nil {
log.Error(err, fmt.Sprintf("Error downloading %s from bucket %s: ", object.Name, bucketName))
}
}()
}
wg.Wait()
return nil
}
// Close closes the GCP Client and logs any useful errors
func (c *GCPClient) Close(log logr.Logger) {
func (c *GCPClient) Close(ctx context.Context) {
log := logr.FromContext(ctx)
if err := c.Client.Close(); err != nil {
log.Error(err, "GCP Provider")
}
}
// DownloadObject gets an object and downloads the object locally.
func DownloadObject(ctx context.Context, cl *GCPClient, obj *gcpstorage.ObjectAttrs, matcher gitignore.Matcher, bucketName, tempDir string) error {
if strings.HasSuffix(obj.Name, "/") || obj.Name == sourceignore.IgnoreFile || matcher.Match(strings.Split(obj.Name, "/"), false) {
return nil
}
localPath := filepath.Join(tempDir, obj.Name)
if err := cl.FGetObject(ctx, bucketName, obj.Name, localPath); err != nil {
return err
}
return nil
}

View File

@ -33,10 +33,15 @@ import (
"time"
gcpstorage "cloud.google.com/go/storage"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/gcp"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"google.golang.org/api/option"
)
@ -44,6 +49,7 @@ import (
const (
bucketName string = "test-bucket"
objectName string = "test.yaml"
region = "us-east-1"
)
var (
@ -51,6 +57,55 @@ var (
client *gcpstorage.Client
close func()
err error
secret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-secret",
Namespace: "default",
},
Data: map[string][]byte{
"serviceaccount": []byte("ewogICAgInR5cGUiOiAic2VydmljZV9hY2NvdW50IiwKICAgICJwcm9qZWN0X2lkIjogInBvZGluZm8iLAogICAgInByaXZhdGVfa2V5X2lkIjogIjI4cXdnaDNnZGY1aGozZ2I1ZmozZ3N1NXlmZ2gzNGY0NTMyNDU2OGh5MiIsCiAgICAicHJpdmF0ZV9rZXkiOiAiLS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tXG5Id2V0aGd5MTIzaHVnZ2hoaGJkY3U2MzU2ZGd5amhzdmd2R0ZESFlnY2RqYnZjZGhic3g2M2Ncbjc2dGd5Y2ZlaHVoVkdURllmdzZ0N3lkZ3lWZ3lkaGV5aHVnZ3ljdWhland5NnQzNWZ0aHl1aGVndmNldGZcblRGVUhHVHlnZ2h1Ymh4ZTY1eWd0NnRneWVkZ3kzMjZodWN5dnN1aGJoY3Zjc2poY3NqaGNzdmdkdEhGQ0dpXG5IY3llNnR5eWczZ2Z5dWhjaGNzYmh5Z2NpamRiaHl5VEY2NnR1aGNldnVoZGNiaHVoaHZmdGN1aGJoM3VoN3Q2eVxuZ2d2ZnRVSGJoNnQ1cmZ0aGh1R1ZSdGZqaGJmY3JkNXI2N3l1aHV2Z0ZUWWpndnRmeWdoYmZjZHJoeWpoYmZjdGZkZnlodmZnXG50Z3ZnZ3RmeWdodmZ0NnR1Z3ZURjVyNjZ0dWpoZ3ZmcnR5aGhnZmN0Nnk3eXRmcjVjdHZnaGJoaHZ0Z2hoanZjdHRmeWNmXG5mZnhmZ2hqYnZnY2d5dDY3dWpiZ3ZjdGZ5aFZDN3VodmdjeWp2aGhqdnl1amNcbmNnZ2hndmdjZmhnZzc2NTQ1NHRjZnRoaGdmdHloaHZ2eXZ2ZmZnZnJ5eXU3N3JlcmVkc3dmdGhoZ2ZjZnR5Y2ZkcnR0ZmhmL1xuLS0tLS1FTkQgUFJJVkFURSBLRVktLS0tLVxuIiwKICAgICJjbGllbnRfZW1haWwiOiAidGVzdEBwb2RpbmZvLmlhbS5nc2VydmljZWFjY291bnQuY29tIiwKICAgICJjbGllbnRfaWQiOiAiMzI2NTc2MzQ2Nzg3NjI1MzY3NDYiLAogICAgImF1dGhfdXJpIjogImh0dHBzOi8vYWNjb3VudHMuZ29vZ2xlLmNvbS9vL29hdXRoMi9hdXRoIiwKICAgICJ0b2tlbl91cmkiOiAiaHR0cHM6Ly9vYXV0aDIuZ29vZ2xlYXBpcy5jb20vdG9rZW4iLAogICAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICAgImNsaWVudF94NTA5X2NlcnRfdXJsIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL3JvYm90L3YxL21ldGFkYXRhL3g1MDkvdGVzdCU0MHBvZGluZm8uaWFtLmdzZXJ2aWNlYWNjb3VudC5jb20iCn0="),
},
Type: "Opaque",
}
badSecret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-secret",
Namespace: "default",
},
Data: map[string][]byte{
"username": []byte("test-user"),
},
Type: "Opaque",
}
bucket = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "storage.googleapis.com",
Region: region,
Provider: "gcp",
Insecure: true,
SecretRef: &meta.LocalObjectReference{
Name: secret.Name,
},
},
}
bucketNoSecretRef = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "gcp-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "storage.googleapis.com",
Region: region,
Provider: "gcp",
Insecure: true,
},
}
)
func TestMain(m *testing.M) {
@ -110,8 +165,16 @@ func TestMain(m *testing.M) {
os.Exit(run)
}
func TestNewClient(t *testing.T) {
gcpClient, err := gcp.NewClient(context.Background(), option.WithHTTPClient(hc))
func TestNewClientWithSecretErr(t *testing.T) {
gcpClient, err := gcp.NewClient(context.Background(), secret, bucket)
t.Log(err)
assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value")
assert.Assert(t, gcpClient == nil)
}
func TestNewClientWithoutSecretErr(t *testing.T) {
gcpClient, err := gcp.NewClient(context.Background(), badSecret, bucketNoSecretRef)
t.Log(err)
assert.NilError(t, err)
assert.Assert(t, gcpClient != nil)
}
@ -161,15 +224,33 @@ func TestListObjects(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
objectIterator := gcpClient.ListObjects(context.Background(), bucketName, nil)
for {
_, err := objectIterator.Next()
if err == gcp.IteratorDone {
break
}
assert.NilError(t, err)
tempDir, err := os.MkdirTemp("", bucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcpClient.ListObjects(context.Background(), matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestListObjectsErr(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
assert.Assert(t, objectIterator != nil)
badBucketName := "bad-bucket"
tempDir, err := os.MkdirTemp("", badBucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcpClient.ListObjects(context.Background(), matcher, badBucketName, tempDir)
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: storage: bucket doesn't exist", badBucketName))
}
func TestFGetObject(t *testing.T) {
@ -203,8 +284,8 @@ func TestFGetObjectNotExists(t *testing.T) {
func TestFGetObjectDirectoryIsFileName(t *testing.T) {
tempDir, err := os.MkdirTemp("", bucketName)
defer os.RemoveAll(tempDir)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
gcpClient := &gcp.GCPClient{
Client: client,
}
@ -214,6 +295,66 @@ func TestFGetObjectDirectoryIsFileName(t *testing.T) {
}
}
func TestDownloadObject(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: objectName,
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestDownloadObjectErr(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: "test1.yaml",
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.Error(t, err, "storage: object doesn't exist")
}
func TestDownloadObjectSuffix(t *testing.T) {
gcpClient := &gcp.GCPClient{
Client: client,
}
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = gcp.DownloadObject(context.Background(), gcpClient, &gcpstorage.ObjectAttrs{
Bucket: bucketName,
Name: "test1/",
ContentType: "text/x-yaml",
Size: 1 << 20,
}, matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestValidateSecret(t *testing.T) {
t.Parallel()
testCases := []struct {

127
pkg/minio/minio.go Normal file
View File

@ -0,0 +1,127 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minio
import (
"context"
"fmt"
"path/filepath"
"strings"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/go-git/go-git/v5/plumbing/format/gitignore"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
corev1 "k8s.io/api/core/v1"
)
type MinioClient struct {
// client for interacting with S3 compatible
// Storage APIs.
*minio.Client
}
// NewClient creates a new Minio storage client.
func NewClient(ctx context.Context, secret corev1.Secret, bucket sourcev1.Bucket) (*MinioClient, error) {
opt := minio.Options{
Region: bucket.Spec.Region,
Secure: !bucket.Spec.Insecure,
}
if bucket.Spec.SecretRef != nil {
accesskey := ""
secretkey := ""
if k, ok := secret.Data["accesskey"]; ok {
accesskey = string(k)
}
if k, ok := secret.Data["secretkey"]; ok {
secretkey = string(k)
}
if accesskey == "" || secretkey == "" {
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
}
opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
opt.Creds = credentials.NewIAM("")
}
if opt.Creds == nil {
return nil, fmt.Errorf("no bucket credentials found")
}
client, err := minio.New(bucket.Spec.Endpoint, &opt)
if err != nil {
return nil, err
}
return &MinioClient{Client: client}, nil
}
// BucketExists checks if the bucket with the provided name exists.
func (c *MinioClient) BucketExists(ctx context.Context, bucketName string) (bool, error) {
return c.Client.BucketExists(ctx, bucketName)
}
// ObjectExists checks if the object with the provided name exists.
func (c *MinioClient) ObjectExists(ctx context.Context, bucketName, objectName string) (bool, error) {
_, err := c.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
if err != nil {
return false, err
}
return true, nil
}
// FGetObject gets the object from the bucket and downloads the object locally.
func (c *MinioClient) FGetObject(ctx context.Context, bucketName, objectName, localPath string) error {
return c.Client.FGetObject(ctx, bucketName, objectName, localPath, minio.GetObjectOptions{})
}
// ListObjects lists all the objects in a bucket and downloads the objects.
func (c *MinioClient) ListObjects(ctx context.Context, matcher gitignore.Matcher, bucketName, tempDir string) error {
for object := range c.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Recursive: true,
UseV1: s3utils.IsGoogleEndpoint(*c.Client.EndpointURL()),
}) {
if object.Err != nil {
err := fmt.Errorf("listing objects from bucket '%s' failed: %w", bucketName, object.Err)
return err
}
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
continue
}
if matcher.Match(strings.Split(object.Key, "/"), false) {
continue
}
localPath := filepath.Join(tempDir, object.Key)
err := c.FGetObject(ctx, bucketName, object.Key, localPath)
if err != nil {
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucketName, err)
return err
}
}
return nil
}
// Close closes the Minio Client and logs any useful errors
func (c *MinioClient) Close(ctx context.Context) {
//minio client does not provide a close method
}

270
pkg/minio/minio_test.go Normal file
View File

@ -0,0 +1,270 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package minio_test
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"testing"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/minio"
"github.com/fluxcd/source-controller/pkg/sourceignore"
"github.com/google/uuid"
miniov7 "github.com/minio/minio-go/v7"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
objectName string = "test.yaml"
region string = "us-east-1"
)
var (
minioclient *minio.MinioClient
bucketName = "test-bucket-minio" + uuid.New().String()
secret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "minio-secret",
Namespace: "default",
},
Data: map[string][]byte{
"accesskey": []byte("Q3AM3UQ867SPQQA43P2F"),
"secretkey": []byte("zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"),
},
Type: "Opaque",
}
bucket = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "minio-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "play.min.io",
Region: region,
Provider: "generic",
Insecure: true,
SecretRef: &meta.LocalObjectReference{
Name: secret.Name,
},
},
}
emptySecret = corev1.Secret{
ObjectMeta: v1.ObjectMeta{
Name: "minio-secret",
Namespace: "default",
},
Data: map[string][]byte{},
Type: "Opaque",
}
bucketNoSecretRef = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "minio-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "play.min.io",
Region: region,
Provider: "generic",
Insecure: true,
},
}
bucketAwsProvider = sourcev1.Bucket{
ObjectMeta: v1.ObjectMeta{
Name: "minio-test-bucket",
Namespace: "default",
},
Spec: sourcev1.BucketSpec{
BucketName: bucketName,
Endpoint: "play.min.io",
Region: region,
Provider: "aws",
Insecure: true,
},
}
)
func TestMain(m *testing.M) {
var err error
ctx := context.Background()
minioclient, err = minio.NewClient(ctx, secret, bucket)
if err != nil {
log.Fatal(err)
}
createBucket(ctx)
addObjectToBucket(ctx)
run := m.Run()
removeObjectFromBucket(ctx)
deleteBucket(ctx)
//minioclient.Client.Close
os.Exit(run)
}
func TestNewClient(t *testing.T) {
ctx := context.Background()
minioClient, err := minio.NewClient(ctx, secret, bucket)
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestNewClientEmptySecret(t *testing.T) {
ctx := context.Background()
minioClient, err := minio.NewClient(ctx, emptySecret, bucket)
assert.Error(t, err, fmt.Sprintf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", emptySecret.Name))
assert.Assert(t, minioClient == nil)
}
func TestNewClientNoSecretRef(t *testing.T) {
ctx := context.Background()
minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketNoSecretRef)
assert.Error(t, err, "no bucket credentials found")
assert.Assert(t, minioClient == nil)
}
func TestNewClientAwsProvider(t *testing.T) {
ctx := context.Background()
minioClient, err := minio.NewClient(ctx, corev1.Secret{}, bucketAwsProvider)
assert.NilError(t, err)
assert.Assert(t, minioClient != nil)
}
func TestBucketExists(t *testing.T) {
ctx := context.Background()
exists, err := minioclient.BucketExists(ctx, bucketName)
assert.NilError(t, err)
assert.Assert(t, exists)
}
func TestBucketNotExists(t *testing.T) {
ctx := context.Background()
exists, err := minioclient.BucketExists(ctx, "notexistsbucket")
assert.NilError(t, err)
assert.Assert(t, !exists)
}
func TestObjectExists(t *testing.T) {
ctx := context.Background()
exists, err := minioclient.ObjectExists(ctx, bucketName, objectName)
assert.NilError(t, err)
assert.Assert(t, exists)
}
func TestObjectNotExists(t *testing.T) {
ctx := context.Background()
exists, err := minioclient.ObjectExists(ctx, bucketName, "notexists.yaml")
assert.Error(t, err, "The specified key does not exist.")
assert.Assert(t, !exists)
}
func TestFGetObject(t *testing.T) {
ctx := context.Background()
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
err = minioclient.FGetObject(ctx, bucketName, objectName, path)
assert.NilError(t, err)
}
func TestListObjects(t *testing.T) {
ctx := context.Background()
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = minioclient.ListObjects(ctx, matcher, bucketName, tempDir)
assert.NilError(t, err)
}
func TestListObjectsErr(t *testing.T) {
ctx := context.Background()
badBucketName := "bad-bucket"
tempDir, err := os.MkdirTemp("", bucketName)
assert.NilError(t, err)
defer os.RemoveAll(tempDir)
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
ps, err := sourceignore.ReadIgnoreFile(path, nil)
assert.NilError(t, err)
matcher := sourceignore.NewMatcher(ps)
err = minioclient.ListObjects(ctx, matcher, badBucketName, tempDir)
assert.Error(t, err, fmt.Sprintf("listing objects from bucket '%s' failed: The specified bucket does not exist", badBucketName))
}
func createBucket(ctx context.Context) {
if err := minioclient.Client.MakeBucket(ctx, bucketName, miniov7.MakeBucketOptions{Region: region}); err != nil {
exists, errBucketExists := minioclient.BucketExists(ctx, bucketName)
if errBucketExists == nil && exists {
deleteBucket(ctx)
} else {
log.Fatalln(err)
}
}
}
func deleteBucket(ctx context.Context) {
if err := minioclient.Client.RemoveBucket(ctx, bucketName); err != nil {
log.Println(err)
}
}
func addObjectToBucket(ctx context.Context) {
fileReader := strings.NewReader(getObjectFile())
fileSize := fileReader.Size()
_, err := minioclient.Client.PutObject(ctx, bucketName, objectName, fileReader, fileSize, miniov7.PutObjectOptions{
ContentType: "text/x-yaml",
})
if err != nil {
log.Println(err)
}
}
func removeObjectFromBucket(ctx context.Context) {
if err := minioclient.Client.RemoveObject(ctx, bucketName, objectName, miniov7.RemoveObjectOptions{
GovernanceBypass: true,
}); err != nil {
log.Println(err)
}
}
func getObjectFile() string {
return `
apiVersion: source.toolkit.fluxcd.io/v1beta1
kind: Bucket
metadata:
name: podinfo
namespace: default
spec:
interval: 5m
provider: aws
bucketName: podinfo
endpoint: s3.amazonaws.com
region: us-east-1
timeout: 30s
`
}