/* Copyright 2020 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "crypto/sha1" "fmt" "os" "path/filepath" "strings" "time" "github.com/go-logr/logr" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/reference" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/source-controller/pkg/gcp" "github.com/fluxcd/source-controller/pkg/minio" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/pkg/sourceignore" ) // maxConcurrentFetches is the upper bound on the goroutines used to // fetch bucket objects. It's important to have a bound, to avoid // using arbitrary amounts of memory; the actual number is chosen // according to the queueing rule of thumb with some conservative // parameters: // s > Nr / T // N (number of requestors, i.e., objects to fetch) = 10000 // r (service time -- fetch duration) = 0.01s (~ a megabyte file over 1Gb/s) // T (total time available) = 1s // -> s > 100 const maxConcurrentFetches = 100 // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // BucketReconciler reconciles a Bucket object type BucketReconciler struct { client.Client Scheme *runtime.Scheme Storage *Storage EventRecorder kuberecorder.EventRecorder ExternalEventRecorder *events.Recorder MetricsRecorder *metrics.Recorder } type BucketReconcilerOptions struct { MaxConcurrentReconciles int } type BucketProvider interface { BucketExists(context.Context, string) (bool, error) ObjectExists(context.Context, string, string) (bool, error) FGetObject(context.Context, string, string, string) error ObjectIsNotFound(error) bool VisitObjects(context.Context, string, func(string) error) error Close(context.Context) } func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{}) } func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts BucketReconcilerOptions) error { return ctrl.NewControllerManagedBy(mgr). For(&sourcev1.Bucket{}). WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})). WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}). Complete(r) } func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { start := time.Now() log := ctrl.LoggerFrom(ctx) var bucket sourcev1.Bucket if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Record suspended status metric defer r.recordSuspension(ctx, bucket) // Add our finalizer if it does not exist if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) { controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer) if err := r.Update(ctx, &bucket); err != nil { log.Error(err, "unable to register finalizer") return ctrl.Result{}, err } } // Examine if the object is under deletion if !bucket.ObjectMeta.DeletionTimestamp.IsZero() { return r.reconcileDelete(ctx, bucket) } // Return early if the object is suspended. if bucket.Spec.Suspend { log.Info("Reconciliation is suspended for this object") return ctrl.Result{}, nil } // record reconciliation duration if r.MetricsRecorder != nil { objRef, err := reference.GetReference(r.Scheme, &bucket) if err != nil { return ctrl.Result{}, err } defer r.MetricsRecorder.RecordDuration(*objRef, start) } // set initial status if resetBucket, ok := r.resetStatus(bucket); ok { bucket = resetBucket if err := r.updateStatus(ctx, req, bucket.Status); err != nil { log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } r.recordReadiness(ctx, bucket) } // record the value of the reconciliation request, if any // TODO(hidde): would be better to defer this in combination with // always patching the status sub-resource after a reconciliation. if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok { bucket.Status.SetLastHandledReconcileRequest(v) } // purge old artifacts from storage if err := r.gc(bucket); err != nil { log.Error(err, "unable to purge old artifacts") } // reconcile bucket by downloading its content reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy()) // update status with the reconciliation result if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil { log.Error(err, "unable to update status") return ctrl.Result{Requeue: true}, err } // if reconciliation failed, record the failure and requeue immediately if reconcileErr != nil { r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error()) r.recordReadiness(ctx, reconciledBucket) return ctrl.Result{Requeue: true}, reconcileErr } // emit revision change event if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision { r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket)) } r.recordReadiness(ctx, reconciledBucket) log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Now().Sub(start).String(), bucket.GetInterval().Duration.String(), )) return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil } func (r *BucketReconciler) getBucketSecret(ctx context.Context, bucket sourcev1.Bucket) (corev1.Secret, error) { var secret corev1.Secret secretName := types.NamespacedName{ Namespace: bucket.GetNamespace(), Name: bucket.Spec.SecretRef.Name, } if err := r.Get(ctx, secretName, &secret); err != nil { return secret, err } return secret, nil } func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { tempDir, err := os.MkdirTemp("", bucket.Name) if err != nil { err = fmt.Errorf("tmp dir error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } defer os.RemoveAll(tempDir) var secret corev1.Secret if bucket.Spec.SecretRef != nil { secret, err = r.getBucketSecret(ctx, bucket) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), fmt.Errorf("credentials secret error: %w", err) } } if bucketResponse, err := fetchBucketContents(ctx, bucket, secret, tempDir); err != nil { return bucketResponse, err } revision, err := r.checksum(tempDir) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } // return early on unchanged revision artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision)) if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) { if artifact.URL != bucket.GetArtifact().URL { r.Storage.SetArtifactURL(bucket.GetArtifact()) bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL) } return bucket, nil } // create artifact dir err = r.Storage.MkdirAll(artifact) if err != nil { err = fmt.Errorf("mkdir dir error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } // acquire lock unlock, err := r.Storage.Lock(artifact) if err != nil { err = fmt.Errorf("unable to acquire lock: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } defer unlock() // archive artifact and check integrity if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil { err = fmt.Errorf("storage archive error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } // update latest symlink url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { err = fmt.Errorf("storage symlink error: %w", err) return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err } message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil } // fetchBucketContents selects a bucket provider that implement the bucket provider interface based on // on the specified provider in the bucket spec. func fetchBucketContents(ctx context.Context, bucket sourcev1.Bucket, secret corev1.Secret, tempDir string) (sourcev1.Bucket, error) { switch bucket.Spec.Provider { case sourcev1.GoogleBucketProvider: gcpClient, err := gcp.NewClient(ctx, secret, bucket) if err != nil { err = fmt.Errorf("auth error: %w", err) return sourcev1.Bucket{}, err } defer gcpClient.Close(ctx) if bucketResponse, err := fetchFiles(ctx, gcpClient, bucket, tempDir); err != nil { return bucketResponse, err } default: minioClient, err := minio.NewClient(ctx, secret, bucket) if err != nil { err = fmt.Errorf("auth error: %w", err) return sourcev1.Bucket{}, err } if bucketResponse, err := fetchFiles(ctx, minioClient, bucket, tempDir); err != nil { return bucketResponse, err } } return sourcev1.Bucket{}, nil } func fetchFiles(ctx context.Context, client BucketProvider, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) defer cancel() exists, err := client.BucketExists(ctxTimeout, bucket.Spec.BucketName) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } if !exists { err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } // Look for file with ignore rules first. ignorefile := filepath.Join(tempDir, sourceignore.IgnoreFile) if err := client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, ignorefile); err != nil { if client.ObjectIsNotFound(err) && sourceignore.IgnoreFile != ".sourceignore" { // FIXME? return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } } ps, err := sourceignore.ReadIgnoreFile(ignorefile, nil) if err != nil { return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } // In-spec patterns take precedence if bucket.Spec.Ignore != nil { ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) // Download in parallel, but bound the concurrency. According to // AWS and GCP docs, rate limits are either soft or don't exist: // - https://cloud.google.com/storage/quotas // - https://docs.aws.amazon.com/general/latest/gr/s3.html // .. so, the limiting factor is this process keeping a small footprint. semaphore := make(chan struct{}, maxConcurrentFetches) group, ctx := errgroup.WithContext(ctx) err = client.VisitObjects(ctxTimeout, bucket.Spec.BucketName, func(path string) error { if strings.HasSuffix(path, "/") || path == sourceignore.IgnoreFile { return nil } if matcher.Match(strings.Split(path, "/"), false) { return nil } // block until there's capacity semaphore <- struct{}{} group.Go(func() error { defer func() { <-semaphore }() localPath := filepath.Join(tempDir, path) err := client.FGetObject(ctx, bucket.Spec.BucketName, path, localPath) if err != nil { err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return err } return nil }) return nil }) // VisitObjects won't return an error, but the errgroup might. if err = group.Wait(); err != nil { err = fmt.Errorf("fetching objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err } return bucket, nil } func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { if err := r.gc(bucket); err != nil { r.event(ctx, bucket, events.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } // Record deleted status r.recordReadiness(ctx, bucket) // Remove our finalizer from the list and update it controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) if err := r.Update(ctx, &bucket); err != nil { return ctrl.Result{}, err } // Stop reconciliation as the object is being deleted return ctrl.Result{}, nil } // checksum calculates the SHA1 checksum of the given root directory. // It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the // list with relative file paths and their checksums. func (r *BucketReconciler) checksum(root string) (string, error) { sum := sha1.New() if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := os.ReadFile(path) if err != nil { return err } relPath, err := filepath.Rel(root, path) if err != nil { return err } sum.Write([]byte(fmt.Sprintf("%x %s\n", sha1.Sum(data), relPath))) return nil }); err != nil { return "", err } return fmt.Sprintf("%x", sum.Sum(nil)), nil } // resetStatus returns a modified v1beta1.Bucket and a boolean indicating // if the status field has been reset. func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) { // We do not have an artifact, or it does no longer exist if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) { bucket = sourcev1.BucketProgressing(bucket) bucket.Status.Artifact = nil return bucket, true } if bucket.Generation != bucket.Status.ObservedGeneration { return sourcev1.BucketProgressing(bucket), true } return bucket, false } // gc performs a garbage collection for the given v1beta1.Bucket. // It removes all but the current artifact except for when the // deletion timestamp is set, which will result in the removal of // all artifacts for the resource. func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error { if !bucket.DeletionTimestamp.IsZero() { return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*")) } if bucket.GetArtifact() != nil { return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact()) } return nil } // event emits a Kubernetes event and forwards the event to notification controller if configured func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) { log := logr.FromContext(ctx) if r.EventRecorder != nil { r.EventRecorder.Eventf(&bucket, "Normal", severity, msg) } if r.ExternalEventRecorder != nil { objRef, err := reference.GetReference(r.Scheme, &bucket) if err != nil { log.Error(err, "unable to send event") return } if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil { log.Error(err, "unable to send event") return } } } func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) { log := logr.FromContext(ctx) if r.MetricsRecorder == nil { return } objRef, err := reference.GetReference(r.Scheme, &bucket) if err != nil { log.Error(err, "unable to record readiness metric") return } if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil { r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero()) } else { r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{ Type: meta.ReadyCondition, Status: metav1.ConditionUnknown, }, !bucket.DeletionTimestamp.IsZero()) } } func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) { if r.MetricsRecorder == nil { return } log := logr.FromContext(ctx) objRef, err := reference.GetReference(r.Scheme, &bucket) if err != nil { log.Error(err, "unable to record suspended metric") return } if !bucket.DeletionTimestamp.IsZero() { r.MetricsRecorder.RecordSuspend(*objRef, false) } else { r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend) } } func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error { var bucket sourcev1.Bucket if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { return err } patch := client.MergeFrom(bucket.DeepCopy()) bucket.Status = newStatus return r.Status().Patch(ctx, &bucket, patch) }