Rewrite `BucketReconciler` to new standards
This commit rewrites the `BucketReconciler` to new standards, while implementing the newly introduced Condition types, and trying to adhere better to Kubernetes API conventions. More specifically it introduces: - Implementation of more explicit Condition types to highlight abnormalities. - Extensive usage of the `conditions` subpackage from `runtime`. - Better and more conflict-resilient (status)patching of reconciled objects using the `patch` subpackage from runtime. - Proper implementation of kstatus' `Reconciling` and `Stalled` conditions. - Refactor of reconciler logic, including more efficient detection of changes to bucket objects by making use of the etag data available, and downloading of object files in parallel with a limited number of workers (4). - Integration tests that solely rely on `testenv` and do not use Ginkgo. There are a couple of TODOs marked in-code, these are suggestions for the future and should be non-blocking. In addition to the TODOs, more complex and/or edge-case test scenarios may be added as well. Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
parent
5d43bcc054
commit
588ccbfe99
|
@ -20,8 +20,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
|
@ -30,6 +28,18 @@ const (
|
|||
BucketKind = "Bucket"
|
||||
)
|
||||
|
||||
const (
|
||||
GenericBucketProvider string = "generic"
|
||||
AmazonBucketProvider string = "aws"
|
||||
)
|
||||
|
||||
const (
|
||||
// DownloadFailedCondition indicates a transient or persistent download failure. If True, observations on the
|
||||
// upstream Source revision are not possible, and the Artifact available for the Source may be outdated.
|
||||
// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
|
||||
DownloadFailedCondition string = "DownloadFailed"
|
||||
)
|
||||
|
||||
// BucketSpec defines the desired state of an S3 compatible bucket
|
||||
type BucketSpec struct {
|
||||
// The S3 compatible storage provider name, default ('generic').
|
||||
|
@ -79,11 +89,6 @@ type BucketSpec struct {
|
|||
Suspend bool `json:"suspend,omitempty"`
|
||||
}
|
||||
|
||||
const (
|
||||
GenericBucketProvider string = "generic"
|
||||
AmazonBucketProvider string = "aws"
|
||||
)
|
||||
|
||||
// BucketStatus defines the observed state of a bucket
|
||||
type BucketStatus struct {
|
||||
// ObservedGeneration is the last observed generation.
|
||||
|
@ -115,45 +120,6 @@ const (
|
|||
BucketOperationFailedReason string = "BucketOperationFailed"
|
||||
)
|
||||
|
||||
// BucketProgressing resets the conditions of the Bucket to metav1.Condition of
|
||||
// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason
|
||||
// reason and message. It returns the modified Bucket.
|
||||
func BucketProgressing(bucket Bucket) Bucket {
|
||||
bucket.Status.ObservedGeneration = bucket.Generation
|
||||
bucket.Status.URL = ""
|
||||
bucket.Status.Conditions = []metav1.Condition{}
|
||||
conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
|
||||
return bucket
|
||||
}
|
||||
|
||||
// BucketReady sets the given Artifact and URL on the Bucket and sets the
|
||||
// meta.ReadyCondition to 'True', with the given reason and message. It returns
|
||||
// the modified Bucket.
|
||||
func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket {
|
||||
bucket.Status.Artifact = &artifact
|
||||
bucket.Status.URL = url
|
||||
conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message)
|
||||
return bucket
|
||||
}
|
||||
|
||||
// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with
|
||||
// the given reason and message. It returns the modified Bucket.
|
||||
func BucketNotReady(bucket Bucket, reason, message string) Bucket {
|
||||
conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message)
|
||||
return bucket
|
||||
}
|
||||
|
||||
// BucketReadyMessage returns the message of the metav1.Condition of type
|
||||
// meta.ReadyCondition with status 'True' if present, or an empty string.
|
||||
func BucketReadyMessage(bucket Bucket) string {
|
||||
if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
if c.Status == metav1.ConditionTrue {
|
||||
return c.Message
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetConditions returns the status conditions of the object.
|
||||
func (in Bucket) GetConditions() []metav1.Condition {
|
||||
return in.Status.Conditions
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
Copyright 2021 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/onsi/gomega/types"
|
||||
)
|
||||
|
||||
// MatchArtifact returns a custom matcher to check equality of a v1beta1.Artifact, the timestamp and URL are ignored.
|
||||
func MatchArtifact(expected *sourcev1.Artifact) types.GomegaMatcher {
|
||||
return &matchArtifact{
|
||||
expected: expected,
|
||||
}
|
||||
}
|
||||
|
||||
type matchArtifact struct {
|
||||
expected *sourcev1.Artifact
|
||||
}
|
||||
|
||||
func (m matchArtifact) Match(actual interface{}) (success bool, err error) {
|
||||
actualArtifact, ok := actual.(*sourcev1.Artifact)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("actual should be a pointer to an Artifact")
|
||||
}
|
||||
|
||||
if ok, _ := BeNil().Match(m.expected); ok {
|
||||
return BeNil().Match(actual)
|
||||
}
|
||||
|
||||
if ok, err = Equal(m.expected.Path).Match(actualArtifact.Path); !ok {
|
||||
return ok, err
|
||||
}
|
||||
if ok, err = Equal(m.expected.Revision).Match(actualArtifact.Revision); !ok {
|
||||
return ok, err
|
||||
}
|
||||
if ok, err = Equal(m.expected.Checksum).Match(actualArtifact.Checksum); !ok {
|
||||
return ok, err
|
||||
}
|
||||
|
||||
return ok, err
|
||||
}
|
||||
|
||||
func (m matchArtifact) FailureMessage(actual interface{}) (message string) {
|
||||
return fmt.Sprintf("expected\n\t%#v\nto match\n\t%#v\n", actual, m.expected)
|
||||
}
|
||||
|
||||
func (m matchArtifact) NegatedFailureMessage(actual interface{}) (message string) {
|
||||
return fmt.Sprintf("expected\n\t%#v\nto not match\n\t%#v\n", actual, m.expected)
|
||||
}
|
|
@ -18,24 +18,23 @@ package controllers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
|
@ -43,8 +42,10 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
helper "github.com/fluxcd/pkg/runtime/controller"
|
||||
"github.com/fluxcd/pkg/runtime/events"
|
||||
"github.com/fluxcd/pkg/runtime/metrics"
|
||||
"github.com/fluxcd/pkg/runtime/patch"
|
||||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
|
@ -59,11 +60,10 @@ import (
|
|||
// BucketReconciler reconciles a Bucket object
|
||||
type BucketReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
Storage *Storage
|
||||
EventRecorder kuberecorder.EventRecorder
|
||||
ExternalEventRecorder *events.Recorder
|
||||
MetricsRecorder *metrics.Recorder
|
||||
helper.Events
|
||||
helper.Metrics
|
||||
|
||||
Storage *Storage
|
||||
}
|
||||
|
||||
type BucketReconcilerOptions struct {
|
||||
|
@ -82,403 +82,497 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc
|
|||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
|
||||
start := time.Now()
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
|
||||
var bucket sourcev1.Bucket
|
||||
if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
|
||||
// Fetch the Bucket
|
||||
obj := &sourcev1.Bucket{}
|
||||
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
// Record suspended status metric
|
||||
defer r.recordSuspension(ctx, bucket)
|
||||
r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
|
||||
|
||||
// Add our finalizer if it does not exist
|
||||
if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) {
|
||||
controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer)
|
||||
if err := r.Update(ctx, &bucket); err != nil {
|
||||
log.Error(err, "unable to register finalizer")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Examine if the object is under deletion
|
||||
if !bucket.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
return r.reconcileDelete(ctx, bucket)
|
||||
}
|
||||
|
||||
// Return early if the object is suspended.
|
||||
if bucket.Spec.Suspend {
|
||||
// Return early if the object is suspended
|
||||
if obj.Spec.Suspend {
|
||||
log.Info("Reconciliation is suspended for this object")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// record reconciliation duration
|
||||
if r.MetricsRecorder != nil {
|
||||
objRef, err := reference.GetReference(r.Scheme, &bucket)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
// Initialize the patch helper
|
||||
patchHelper, err := patch.NewHelper(obj, r.Client)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Always attempt to patch the object and status after each reconciliation
|
||||
defer func() {
|
||||
// Record the value of the reconciliation request, if any
|
||||
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
|
||||
obj.Status.SetLastHandledReconcileRequest(v)
|
||||
}
|
||||
defer r.MetricsRecorder.RecordDuration(*objRef, start)
|
||||
}
|
||||
|
||||
// set initial status
|
||||
if resetBucket, ok := r.resetStatus(bucket); ok {
|
||||
bucket = resetBucket
|
||||
if err := r.updateStatus(ctx, req, bucket.Status); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
// Summarize the Ready condition based on abnormalities that may have been observed
|
||||
conditions.SetSummary(obj,
|
||||
meta.ReadyCondition,
|
||||
conditions.WithConditions(
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.DownloadFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
),
|
||||
conditions.WithNegativePolarityConditions(
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.DownloadFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
),
|
||||
)
|
||||
|
||||
// Patch the object, ignoring conflicts on the conditions owned by this controller
|
||||
patchOpts := []patch.Option{
|
||||
patch.WithOwnedConditions{
|
||||
Conditions: []string{
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.DownloadFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
meta.ReadyCondition,
|
||||
meta.ReconcilingCondition,
|
||||
meta.StalledCondition,
|
||||
},
|
||||
},
|
||||
}
|
||||
r.recordReadiness(ctx, bucket)
|
||||
|
||||
// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
|
||||
if retErr == nil && (result.IsZero() || !result.Requeue) {
|
||||
// We are no longer reconciling
|
||||
conditions.Delete(obj, meta.ReconcilingCondition)
|
||||
|
||||
// We have now observed this generation
|
||||
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
|
||||
|
||||
readyCondition := conditions.Get(obj, meta.ReadyCondition)
|
||||
switch readyCondition.Status {
|
||||
case metav1.ConditionFalse:
|
||||
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
|
||||
conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
|
||||
case metav1.ConditionTrue:
|
||||
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
|
||||
conditions.Delete(obj, meta.StalledCondition)
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, patch the resource
|
||||
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
|
||||
retErr = kerrors.NewAggregate([]error{retErr, err})
|
||||
}
|
||||
|
||||
// Always record readiness and duration metrics
|
||||
r.Metrics.RecordReadiness(ctx, obj)
|
||||
r.Metrics.RecordDuration(ctx, obj, start)
|
||||
}()
|
||||
|
||||
// Add finalizer first if not exist to avoid the race condition between init and delete
|
||||
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
|
||||
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
|
||||
// record the value of the reconciliation request, if any
|
||||
// TODO(hidde): would be better to defer this in combination with
|
||||
// always patching the status sub-resource after a reconciliation.
|
||||
if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok {
|
||||
bucket.Status.SetLastHandledReconcileRequest(v)
|
||||
// Examine if the object is under deletion
|
||||
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
return r.reconcileDelete(ctx, obj)
|
||||
}
|
||||
|
||||
// purge old artifacts from storage
|
||||
if err := r.gc(bucket); err != nil {
|
||||
log.Error(err, "unable to purge old artifacts")
|
||||
}
|
||||
|
||||
// reconcile bucket by downloading its content
|
||||
reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy())
|
||||
|
||||
// update status with the reconciliation result
|
||||
if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// if reconciliation failed, record the failure and requeue immediately
|
||||
if reconcileErr != nil {
|
||||
r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error())
|
||||
r.recordReadiness(ctx, reconciledBucket)
|
||||
return ctrl.Result{Requeue: true}, reconcileErr
|
||||
}
|
||||
|
||||
// emit revision change event
|
||||
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
|
||||
r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
|
||||
}
|
||||
r.recordReadiness(ctx, reconciledBucket)
|
||||
|
||||
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
|
||||
time.Now().Sub(start).String(),
|
||||
bucket.GetRequeueAfter().String(),
|
||||
))
|
||||
|
||||
return ctrl.Result{RequeueAfter: bucket.GetRequeueAfter()}, nil
|
||||
// Reconcile actual object
|
||||
return r.reconcile(ctx, obj)
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
|
||||
s3Client, err := r.auth(ctx, bucket)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("auth error: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
|
||||
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
|
||||
// produces an error.
|
||||
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
// Mark the resource as under reconciliation
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
|
||||
|
||||
// Reconcile the storage data
|
||||
if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
// create tmp dir
|
||||
tempDir, err := os.MkdirTemp("", bucket.Name)
|
||||
// Create temp working dir
|
||||
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
|
||||
if err != nil {
|
||||
err = fmt.Errorf("tmp dir error: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
|
||||
// Reconcile the source from upstream
|
||||
var artifact sourcev1.Artifact
|
||||
if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err
|
||||
}
|
||||
|
||||
// Reconcile the artifact to storage
|
||||
if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
}
|
||||
|
||||
// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
|
||||
//
|
||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
||||
// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
|
||||
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
// Garbage collect previous advertised artifact(s) from storage
|
||||
_ = r.garbageCollect(ctx, obj)
|
||||
|
||||
// Determine if the advertised artifact is still in storage
|
||||
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
|
||||
obj.Status.Artifact = nil
|
||||
obj.Status.URL = ""
|
||||
}
|
||||
|
||||
// Record that we do not have an artifact
|
||||
if obj.GetArtifact() == nil {
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
|
||||
// Always update URLs to ensure hostname is up-to-date
|
||||
// TODO(hidde): we may want to send out an event only if we notice the URL has changed
|
||||
r.Storage.SetArtifactURL(obj.GetArtifact())
|
||||
obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
}
|
||||
|
||||
// reconcileSource ensures the upstream bucket can be reached and downloaded using the declared configuration, and
|
||||
// observes its state.
|
||||
//
|
||||
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
|
||||
// account. In case of an error during the download process (including transient errors), it records
|
||||
// v1beta1.DownloadFailedCondition=True and returns early.
|
||||
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
|
||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
|
||||
// Attempt to retrieve secret if one is configured
|
||||
var secret *corev1.Secret
|
||||
if obj.Spec.SecretRef != nil {
|
||||
secret = &corev1.Secret{}
|
||||
name := types.NamespacedName{
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.Spec.SecretRef.Name,
|
||||
}
|
||||
if err := r.Client.Get(ctx, name, secret); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", name.String(), err.Error())
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", name.String(), err.Error())
|
||||
// Return error as the world as observed may change
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Build the client with the configuration from the object and secret
|
||||
s3Client, err := r.buildClient(obj, secret)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct S3 client: %s", err.Error())
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct S3 client: %s", err.Error())
|
||||
// Return error as the contents of the secret may change
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Confirm bucket exists
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
||||
defer cancel()
|
||||
|
||||
exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
|
||||
exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
||||
if err != nil {
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if !exists {
|
||||
err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
}
|
||||
|
||||
// Look for file with ignore rules first
|
||||
// NB: S3 has flat filepath keys making it impossible to look
|
||||
// for files in "subdirectories" without building up a tree first.
|
||||
path := filepath.Join(tempDir, sourceignore.IgnoreFile)
|
||||
if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
|
||||
path := filepath.Join(dir, sourceignore.IgnoreFile)
|
||||
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
|
||||
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
||||
if err != nil {
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
// In-spec patterns take precedence
|
||||
if bucket.Spec.Ignore != nil {
|
||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
|
||||
if obj.Spec.Ignore != nil {
|
||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
|
||||
}
|
||||
matcher := sourceignore.NewMatcher(ps)
|
||||
|
||||
// download bucket content
|
||||
for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{
|
||||
// Build up an index of object keys and their etags
|
||||
// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
|
||||
// detect both structural and file changes
|
||||
index := map[string]string{}
|
||||
for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
|
||||
}) {
|
||||
if object.Err != nil {
|
||||
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
if err = object.Err; err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Ignore directories and the .sourceignore file
|
||||
if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
|
||||
continue
|
||||
}
|
||||
|
||||
// Ignore matches
|
||||
if matcher.Match(strings.Split(object.Key, "/"), false) {
|
||||
continue
|
||||
}
|
||||
|
||||
localPath := filepath.Join(tempDir, object.Key)
|
||||
err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
|
||||
}
|
||||
index[object.Key] = object.ETag
|
||||
}
|
||||
|
||||
revision, err := r.checksum(tempDir)
|
||||
// Calculate revision checksum from the collected index values
|
||||
revision, err := r.revision(index)
|
||||
if err != nil {
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// return early on unchanged revision
|
||||
artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision))
|
||||
if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) {
|
||||
if artifact.URL != bucket.GetArtifact().URL {
|
||||
r.Storage.SetArtifactURL(bucket.GetArtifact())
|
||||
bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL)
|
||||
if !obj.GetArtifact().HasRevision(revision) {
|
||||
// Mark observations about the revision on the object
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
|
||||
"New upstream revision '%s'", revision)
|
||||
|
||||
// Download the files in parallel, but with a limited number of workers
|
||||
group, groupCtx := errgroup.WithContext(ctx)
|
||||
group.Go(func() error {
|
||||
const workers = 4
|
||||
sem := semaphore.NewWeighted(workers)
|
||||
for key := range index {
|
||||
k := key
|
||||
if err := sem.Acquire(groupCtx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
group.Go(func() error {
|
||||
defer sem.Release(1)
|
||||
localPath := filepath.Join(dir, k)
|
||||
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil {
|
||||
return fmt.Errorf("failed to get '%s' file: %w", k, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err = group.Wait(); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return bucket, nil
|
||||
r.Eventf(ctx, obj, events.EventSeverityInfo, sourcev1.BucketOperationSucceedReason,
|
||||
"Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.DownloadFailedCondition)
|
||||
|
||||
// Create potential new artifact
|
||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
}
|
||||
|
||||
// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
|
||||
// given data.
|
||||
//
|
||||
// The inspection of the given data to the object is differed, ensuring any stale observations as
|
||||
// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
|
||||
// If the given artifact does not differ from the object's current, it returns early.
|
||||
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
||||
// updated to its path.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) {
|
||||
// Always restore the Ready condition in case it got removed due to a transient error
|
||||
defer func() {
|
||||
if obj.GetArtifact() != nil {
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
}
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
|
||||
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
|
||||
"Stored artifact for revision '%s'", artifact.Revision)
|
||||
}
|
||||
}()
|
||||
|
||||
// The artifact is up-to-date
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||
ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision))
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
}
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("mkdir dir error: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
// Ensure target path exists and is a directory
|
||||
if f, err := os.Stat(dir); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
|
||||
return ctrl.Result{}, err
|
||||
} else if !f.IsDir() {
|
||||
err := fmt.Errorf("source path '%s' is not a directory", dir)
|
||||
ctrl.LoggerFrom(ctx).Error(err, "invalid target path")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// acquire lock
|
||||
// Ensure artifact directory exists and acquire lock
|
||||
if err := r.Storage.MkdirAll(artifact); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
unlock, err := r.Storage.Lock(artifact)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to acquire lock: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
// archive artifact and check integrity
|
||||
if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil {
|
||||
err = fmt.Errorf("storage archive error: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
// Archive directory to storage
|
||||
if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
|
||||
"Unable to archive artifact to storage: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
r.Events.EventWithMetaf(ctx, obj, map[string]string{
|
||||
"revision": artifact.Revision,
|
||||
"checksum": artifact.Checksum,
|
||||
}, events.EventSeverityInfo, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
|
||||
|
||||
// update latest symlink
|
||||
// Record it on the object
|
||||
obj.Status.Artifact = artifact.DeepCopy()
|
||||
|
||||
// Update symlink on a "best effort" basis
|
||||
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
|
||||
if err != nil {
|
||||
err = fmt.Errorf("storage symlink error: %w", err)
|
||||
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
r.Events.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
|
||||
"Failed to update status URL symlink: %s", err)
|
||||
}
|
||||
|
||||
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
|
||||
return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
|
||||
if url != "" {
|
||||
obj.Status.URL = url
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
|
||||
if err := r.gc(bucket); err != nil {
|
||||
r.event(ctx, bucket, events.EventSeverityError,
|
||||
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
|
||||
// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the
|
||||
// artifact storage, if successful, the finalizer is removed from the object.
|
||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
// Garbage collect the resource's artifacts
|
||||
if err := r.garbageCollect(ctx, obj); err != nil {
|
||||
// Return the error so we retry the failed garbage collection
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Record deleted status
|
||||
r.recordReadiness(ctx, bucket)
|
||||
|
||||
// Remove our finalizer from the list and update it
|
||||
controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
|
||||
if err := r.Update(ctx, &bucket); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
// Remove our finalizer from the list
|
||||
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
|
||||
// Stop reconciliation as the object is being deleted
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) auth(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
|
||||
opt := minio.Options{
|
||||
Region: bucket.Spec.Region,
|
||||
Secure: !bucket.Spec.Insecure,
|
||||
}
|
||||
|
||||
if bucket.Spec.SecretRef != nil {
|
||||
secretName := types.NamespacedName{
|
||||
Namespace: bucket.GetNamespace(),
|
||||
Name: bucket.Spec.SecretRef.Name,
|
||||
}
|
||||
|
||||
var secret corev1.Secret
|
||||
if err := r.Get(ctx, secretName, &secret); err != nil {
|
||||
return nil, fmt.Errorf("credentials secret error: %w", err)
|
||||
}
|
||||
|
||||
accesskey := ""
|
||||
secretkey := ""
|
||||
if k, ok := secret.Data["accesskey"]; ok {
|
||||
accesskey = string(k)
|
||||
}
|
||||
if k, ok := secret.Data["secretkey"]; ok {
|
||||
secretkey = string(k)
|
||||
}
|
||||
if accesskey == "" || secretkey == "" {
|
||||
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
|
||||
}
|
||||
opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
|
||||
} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
|
||||
opt.Creds = credentials.NewIAM("")
|
||||
}
|
||||
|
||||
if opt.Creds == nil {
|
||||
return nil, fmt.Errorf("no bucket credentials found")
|
||||
}
|
||||
|
||||
return minio.New(bucket.Spec.Endpoint, &opt)
|
||||
}
|
||||
|
||||
// checksum calculates the SHA1 checksum of the given root directory.
|
||||
// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the
|
||||
// list with relative file paths and their checksums.
|
||||
func (r *BucketReconciler) checksum(root string) (string, error) {
|
||||
sum := sha1.New()
|
||||
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current
|
||||
// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
|
||||
// resource.
|
||||
func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error {
|
||||
if !obj.DeletionTimestamp.IsZero() {
|
||||
if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed",
|
||||
"Garbage collection for deleted resource failed: %s", err)
|
||||
return err
|
||||
}
|
||||
if !info.Mode().IsRegular() {
|
||||
return nil
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
relPath, err := filepath.Rel(root, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sum.Write([]byte(fmt.Sprintf("%x %s\n", sha1.Sum(data), relPath)))
|
||||
obj.Status.Artifact = nil
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded",
|
||||
"Garbage collected artifacts for deleted resource")
|
||||
return nil
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// resetStatus returns a modified v1beta1.Bucket and a boolean indicating
|
||||
// if the status field has been reset.
|
||||
func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) {
|
||||
// We do not have an artifact, or it does no longer exist
|
||||
if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) {
|
||||
bucket = sourcev1.BucketProgressing(bucket)
|
||||
bucket.Status.Artifact = nil
|
||||
return bucket, true
|
||||
}
|
||||
if bucket.Generation != bucket.Status.ObservedGeneration {
|
||||
return sourcev1.BucketProgressing(bucket), true
|
||||
}
|
||||
return bucket, false
|
||||
}
|
||||
|
||||
// gc performs a garbage collection for the given v1beta1.Bucket.
|
||||
// It removes all but the current artifact except for when the
|
||||
// deletion timestamp is set, which will result in the removal of
|
||||
// all artifacts for the resource.
|
||||
func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
|
||||
if !bucket.DeletionTimestamp.IsZero() {
|
||||
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*"))
|
||||
}
|
||||
if bucket.GetArtifact() != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact())
|
||||
if obj.GetArtifact() != nil {
|
||||
if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
|
||||
r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
|
||||
return err
|
||||
}
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// event emits a Kubernetes event and forwards the event to notification controller if configured
|
||||
func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) {
|
||||
log := logr.FromContext(ctx)
|
||||
if r.EventRecorder != nil {
|
||||
r.EventRecorder.Eventf(&bucket, "Normal", severity, msg)
|
||||
// buildClient constructs a minio.Client with the data from the given object and secret.
|
||||
// It returns an error if the given Secret does not have the required fields, or if there is no credential handler
|
||||
// configured.
|
||||
func (r *BucketReconciler) buildClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) {
|
||||
opts := minio.Options{
|
||||
Region: obj.Spec.Region,
|
||||
Secure: !obj.Spec.Insecure,
|
||||
}
|
||||
if r.ExternalEventRecorder != nil {
|
||||
objRef, err := reference.GetReference(r.Scheme, &bucket)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to send event")
|
||||
return
|
||||
if secret != nil {
|
||||
var accessKey, secretKey string
|
||||
if k, ok := secret.Data["accesskey"]; ok {
|
||||
accessKey = string(k)
|
||||
}
|
||||
if k, ok := secret.Data["secretkey"]; ok {
|
||||
secretKey = string(k)
|
||||
}
|
||||
if accessKey == "" || secretKey == "" {
|
||||
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
|
||||
}
|
||||
opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
|
||||
} else if obj.Spec.Provider == sourcev1.AmazonBucketProvider {
|
||||
opts.Creds = credentials.NewIAM("")
|
||||
}
|
||||
|
||||
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
|
||||
log.Error(err, "unable to send event")
|
||||
return
|
||||
return minio.New(obj.Spec.Endpoint, &opts)
|
||||
}
|
||||
|
||||
// revision calculates the SHA256 checksum of the given string map.
|
||||
// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of
|
||||
// the key/value pairs, each pair written on a newline. The sum result is returned as a string.
|
||||
func (r *BucketReconciler) revision(list map[string]string) (string, error) {
|
||||
keyIndex := make([]string, 0, len(list))
|
||||
for k := range list {
|
||||
keyIndex = append(keyIndex, k)
|
||||
}
|
||||
sort.Strings(keyIndex)
|
||||
sum := sha256.New()
|
||||
for _, k := range keyIndex {
|
||||
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, list[k]))); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) {
|
||||
log := logr.FromContext(ctx)
|
||||
if r.MetricsRecorder == nil {
|
||||
return
|
||||
}
|
||||
objRef, err := reference.GetReference(r.Scheme, &bucket)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to record readiness metric")
|
||||
return
|
||||
}
|
||||
if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
|
||||
r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero())
|
||||
} else {
|
||||
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
|
||||
Type: meta.ReadyCondition,
|
||||
Status: metav1.ConditionUnknown,
|
||||
}, !bucket.DeletionTimestamp.IsZero())
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) {
|
||||
if r.MetricsRecorder == nil {
|
||||
return
|
||||
}
|
||||
log := logr.FromContext(ctx)
|
||||
|
||||
objRef, err := reference.GetReference(r.Scheme, &bucket)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to record suspended metric")
|
||||
return
|
||||
}
|
||||
|
||||
if !bucket.DeletionTimestamp.IsZero() {
|
||||
r.MetricsRecorder.RecordSuspend(*objRef, false)
|
||||
} else {
|
||||
r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error {
|
||||
var bucket sourcev1.Bucket
|
||||
if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
patch := client.MergeFrom(bucket.DeepCopy())
|
||||
bucket.Status = newStatus
|
||||
|
||||
return r.Status().Patch(ctx, &bucket, patch)
|
||||
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
||||
}
|
||||
|
|
|
@ -17,59 +17,567 @@ limitations under the License.
|
|||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
. "github.com/onsi/gomega"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
)
|
||||
|
||||
func TestBucketReconciler_checksum(t *testing.T) {
|
||||
func TestBucketReconciler_Reconcile(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
s3Server := newS3Server("test-bucket")
|
||||
s3Server.Objects = []*s3MockObject{
|
||||
{
|
||||
Key: "test.yaml",
|
||||
Content: []byte("test"),
|
||||
ContentType: "text/plain",
|
||||
LastModified: time.Now(),
|
||||
},
|
||||
}
|
||||
s3Server.Start()
|
||||
defer s3Server.Stop()
|
||||
|
||||
g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty())
|
||||
u, err := url.Parse(s3Server.HTTPAddress())
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
secret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "bucket-reconcile-",
|
||||
Namespace: "default",
|
||||
},
|
||||
Data: map[string][]byte{
|
||||
"accesskey": []byte("key"),
|
||||
"secretkey": []byte("secret"),
|
||||
},
|
||||
}
|
||||
g.Expect(testEnv.Create(ctx, secret)).To(Succeed())
|
||||
defer testEnv.Delete(ctx, secret)
|
||||
|
||||
obj := &sourcev1.Bucket{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "bucket-reconcile-",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: sourcev1.BucketSpec{
|
||||
Provider: "generic",
|
||||
BucketName: s3Server.BucketName,
|
||||
Endpoint: u.Host,
|
||||
Insecure: true,
|
||||
Interval: metav1.Duration{Duration: interval},
|
||||
Timeout: &metav1.Duration{Duration: timeout},
|
||||
SecretRef: &meta.LocalObjectReference{
|
||||
Name: secret.Name,
|
||||
},
|
||||
},
|
||||
}
|
||||
g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
|
||||
|
||||
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
|
||||
|
||||
// Wait for finalizer to be set
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, obj); err != nil {
|
||||
return false
|
||||
}
|
||||
return len(obj.Finalizers) > 0
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
// Wait for Bucket to be Ready
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, obj); err != nil {
|
||||
return false
|
||||
}
|
||||
if !conditions.IsReady(obj) || obj.Status.Artifact == nil {
|
||||
return false
|
||||
}
|
||||
readyCondition := conditions.Get(obj, meta.ReadyCondition)
|
||||
return obj.Generation == readyCondition.ObservedGeneration &&
|
||||
obj.Generation == obj.Status.ObservedGeneration
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
|
||||
|
||||
// Wait for Bucket to be deleted
|
||||
g.Eventually(func() bool {
|
||||
if err := testEnv.Get(ctx, key, obj); err != nil {
|
||||
return apierrors.IsNotFound(err)
|
||||
}
|
||||
return false
|
||||
}, timeout).Should(BeTrue())
|
||||
}
|
||||
|
||||
func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
beforeFunc func(root string)
|
||||
want string
|
||||
wantErr bool
|
||||
name string
|
||||
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
|
||||
want ctrl.Result
|
||||
wantErr bool
|
||||
assertArtifact *sourcev1.Artifact
|
||||
assertConditions []metav1.Condition
|
||||
assertPaths []string
|
||||
}{
|
||||
{
|
||||
name: "empty root",
|
||||
want: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
|
||||
name: "garbage collects",
|
||||
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
|
||||
revisions := []string{"a", "b", "c"}
|
||||
for n := range revisions {
|
||||
v := revisions[n]
|
||||
obj.Status.Artifact = &sourcev1.Artifact{
|
||||
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
|
||||
Revision: v,
|
||||
}
|
||||
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
testStorage.SetArtifactURL(obj.Status.Artifact)
|
||||
return nil
|
||||
},
|
||||
assertArtifact: &sourcev1.Artifact{
|
||||
Path: "/reconcile-storage/c.txt",
|
||||
Revision: "c",
|
||||
Checksum: "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
|
||||
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
|
||||
},
|
||||
assertPaths: []string{
|
||||
"/reconcile-storage/c.txt",
|
||||
"!/reconcile-storage/b.txt",
|
||||
"!/reconcile-storage/a.txt",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with file",
|
||||
beforeFunc: func(root string) {
|
||||
mockFile(root, "a/b/c.txt", "a dummy string")
|
||||
name: "notices missing artifact in storage",
|
||||
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
|
||||
obj.Status.Artifact = &sourcev1.Artifact{
|
||||
Path: fmt.Sprintf("/reconcile-storage/invalid.txt"),
|
||||
Revision: "d",
|
||||
}
|
||||
testStorage.SetArtifactURL(obj.Status.Artifact)
|
||||
return nil
|
||||
},
|
||||
want: ctrl.Result{Requeue: true},
|
||||
assertPaths: []string{
|
||||
"!/reconcile-storage/invalid.txt",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
|
||||
},
|
||||
want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0",
|
||||
},
|
||||
{
|
||||
name: "with file in different path",
|
||||
beforeFunc: func(root string) {
|
||||
mockFile(root, "a/b.txt", "a dummy string")
|
||||
name: "updates hostname on diff from current",
|
||||
beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
|
||||
obj.Status.Artifact = &sourcev1.Artifact{
|
||||
Path: fmt.Sprintf("/reconcile-storage/hostname.txt"),
|
||||
Revision: "f",
|
||||
Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
|
||||
URL: "http://outdated.com/reconcile-storage/hostname.txt",
|
||||
}
|
||||
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
assertPaths: []string{
|
||||
"/reconcile-storage/hostname.txt",
|
||||
},
|
||||
assertArtifact: &sourcev1.Artifact{
|
||||
Path: "/reconcile-storage/hostname.txt",
|
||||
Revision: "f",
|
||||
Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
|
||||
URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
|
||||
},
|
||||
want: "e28c62b5cc488849950c4355dddc5523712616d4",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
root, err := os.MkdirTemp("", "bucket-checksum-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
g := NewWithT(t)
|
||||
|
||||
r := &BucketReconciler{
|
||||
Storage: testStorage,
|
||||
}
|
||||
|
||||
obj := &sourcev1.Bucket{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: "test-",
|
||||
},
|
||||
}
|
||||
defer os.RemoveAll(root)
|
||||
if tt.beforeFunc != nil {
|
||||
tt.beforeFunc(root)
|
||||
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
|
||||
}
|
||||
got, err := (&BucketReconciler{}).checksum(root)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
||||
got, err := r.reconcileStorage(context.TODO(), obj)
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
|
||||
if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
|
||||
g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("checksum() got = %v, want %v", got, tt.want)
|
||||
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
|
||||
|
||||
for _, p := range tt.assertPaths {
|
||||
absoluteP := filepath.Join(testStorage.BasePath, p)
|
||||
if !strings.HasPrefix(p, "!") {
|
||||
g.Expect(absoluteP).To(BeAnExistingFile())
|
||||
continue
|
||||
}
|
||||
g.Expect(absoluteP).NotTo(BeAnExistingFile())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketReconciler_reconcileSource(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
bucketName string
|
||||
bucketObjects []*s3MockObject
|
||||
middleware http.Handler
|
||||
secret *corev1.Secret
|
||||
beforeFunc func(obj *sourcev1.Bucket)
|
||||
want ctrl.Result
|
||||
wantErr bool
|
||||
assertArtifact sourcev1.Artifact
|
||||
assertConditions []metav1.Condition
|
||||
}{
|
||||
{
|
||||
name: "reconciles source",
|
||||
bucketName: "dummy",
|
||||
bucketObjects: []*s3MockObject{
|
||||
{
|
||||
Key: "test.txt",
|
||||
Content: []byte("test"),
|
||||
ContentType: "text/plain",
|
||||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
|
||||
Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
},
|
||||
},
|
||||
// TODO(hidde): middleware for mock server
|
||||
//{
|
||||
// name: "authenticates using secretRef",
|
||||
// bucketName: "dummy",
|
||||
//},
|
||||
{
|
||||
name: "observes non-existing secretRef",
|
||||
bucketName: "dummy",
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
obj.Spec.SecretRef = &meta.LocalObjectReference{
|
||||
Name: "dummy",
|
||||
}
|
||||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "observes invalid secretRef",
|
||||
bucketName: "dummy",
|
||||
secret: &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "dummy",
|
||||
},
|
||||
},
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
obj.Spec.SecretRef = &meta.LocalObjectReference{
|
||||
Name: "dummy",
|
||||
}
|
||||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "observes non-existing bucket name",
|
||||
bucketName: "dummy",
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
obj.Spec.BucketName = "invalid"
|
||||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "transient bucket name API failure",
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
obj.Spec.Endpoint = "transient.example.com"
|
||||
obj.Spec.BucketName = "unavailable"
|
||||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
// TODO(hidde): test the lesser happy paths
|
||||
name: ".sourceignore",
|
||||
bucketName: "dummy",
|
||||
bucketObjects: []*s3MockObject{
|
||||
{
|
||||
Key: ".sourceignore",
|
||||
Content: []byte("ignored/file.txt"),
|
||||
ContentType: "text/plain",
|
||||
LastModified: time.Now(),
|
||||
},
|
||||
{
|
||||
Key: "ignored/file.txt",
|
||||
Content: []byte("ignored/file.txt"),
|
||||
ContentType: "text/plain",
|
||||
LastModified: time.Now(),
|
||||
},
|
||||
{
|
||||
Key: "included/file.txt",
|
||||
Content: []byte("included/file.txt"),
|
||||
ContentType: "text/plain",
|
||||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz",
|
||||
Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme())
|
||||
if tt.secret != nil {
|
||||
builder.WithObjects(tt.secret)
|
||||
}
|
||||
r := &BucketReconciler{
|
||||
Client: builder.Build(),
|
||||
Storage: testStorage,
|
||||
}
|
||||
tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-")
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
obj := &sourcev1.Bucket{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: sourcev1.BucketKind,
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-bucket",
|
||||
},
|
||||
Spec: sourcev1.BucketSpec{
|
||||
Timeout: &metav1.Duration{Duration: timeout},
|
||||
},
|
||||
}
|
||||
|
||||
var server *s3MockServer
|
||||
if tt.bucketName != "" {
|
||||
server = newS3Server(tt.bucketName)
|
||||
server.Objects = tt.bucketObjects
|
||||
server.Start()
|
||||
defer server.Stop()
|
||||
|
||||
g.Expect(server.HTTPAddress()).ToNot(BeEmpty())
|
||||
u, err := url.Parse(server.HTTPAddress())
|
||||
g.Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
obj.Spec.BucketName = tt.bucketName
|
||||
obj.Spec.Endpoint = u.Host
|
||||
// TODO(hidde): also test TLS
|
||||
obj.Spec.Insecure = true
|
||||
}
|
||||
if tt.beforeFunc != nil {
|
||||
tt.beforeFunc(obj)
|
||||
}
|
||||
|
||||
artifact := &sourcev1.Artifact{}
|
||||
got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
|
||||
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
artifact sourcev1.Artifact
|
||||
beforeFunc func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string)
|
||||
want ctrl.Result
|
||||
wantErr bool
|
||||
assertConditions []metav1.Condition
|
||||
}{
|
||||
{
|
||||
name: "artifact revision up-to-date",
|
||||
artifact: sourcev1.Artifact{
|
||||
Revision: "existing",
|
||||
},
|
||||
beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
obj.Status.Artifact = &artifact
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "dir path deleted",
|
||||
beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
_ = os.RemoveAll(dir)
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
//{
|
||||
// name: "dir path empty",
|
||||
//},
|
||||
//{
|
||||
// name: "success",
|
||||
// artifact: sourcev1.Artifact{
|
||||
// Revision: "existing",
|
||||
// },
|
||||
// beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
// obj.Status.Artifact = &artifact
|
||||
// },
|
||||
// assertConditions: []metav1.Condition{
|
||||
// *conditions.TrueCondition(sourcev1.ArtifactAvailableCondition, meta.SucceededReason, "Compressed source to artifact with revision 'existing'"),
|
||||
// },
|
||||
//},
|
||||
//{
|
||||
// name: "symlink",
|
||||
//},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-")
|
||||
g.Expect(err).ToNot(HaveOccurred())
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
obj := &sourcev1.Bucket{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: sourcev1.BucketKind,
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-bucket",
|
||||
},
|
||||
Spec: sourcev1.BucketSpec{
|
||||
Timeout: &metav1.Duration{Duration: timeout},
|
||||
},
|
||||
}
|
||||
|
||||
if tt.beforeFunc != nil {
|
||||
tt.beforeFunc(obj, tt.artifact, tmpDir)
|
||||
}
|
||||
|
||||
r := &BucketReconciler{
|
||||
Storage: testStorage,
|
||||
}
|
||||
|
||||
got, err := r.reconcileArtifact(logr.NewContext(ctx, log.NullLogger{}), obj, tt.artifact, tmpDir)
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
//g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
|
||||
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketReconciler_revision(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
list map[string]string
|
||||
want string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "list with items",
|
||||
list: map[string]string{
|
||||
"one": "one",
|
||||
"two": "two",
|
||||
"three": "three",
|
||||
},
|
||||
want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
|
||||
},
|
||||
{
|
||||
name: "list with items in different order",
|
||||
list: map[string]string{
|
||||
"three": "three",
|
||||
"one": "one",
|
||||
"two": "two",
|
||||
},
|
||||
want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
|
||||
},
|
||||
{
|
||||
name: "empty list",
|
||||
list: map[string]string{},
|
||||
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
},
|
||||
{
|
||||
name: "nil list",
|
||||
list: nil,
|
||||
want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := (&BucketReconciler{}).revision(tt.list)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("revision() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// helpers
|
||||
|
||||
func mockFile(root, path, content string) error {
|
||||
filePath := filepath.Join(root, path)
|
||||
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
|
||||
|
@ -80,3 +588,120 @@ func mockFile(root, path, content string) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type s3MockObject struct {
|
||||
Key string
|
||||
LastModified time.Time
|
||||
ContentType string
|
||||
Content []byte
|
||||
}
|
||||
|
||||
type s3MockServer struct {
|
||||
srv *httptest.Server
|
||||
mux *http.ServeMux
|
||||
|
||||
BucketName string
|
||||
Objects []*s3MockObject
|
||||
}
|
||||
|
||||
func newS3Server(bucketName string) *s3MockServer {
|
||||
s := &s3MockServer{BucketName: bucketName}
|
||||
s.mux = http.NewServeMux()
|
||||
s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler))
|
||||
|
||||
s.srv = httptest.NewUnstartedServer(s.mux)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *s3MockServer) Start() {
|
||||
s.srv.Start()
|
||||
}
|
||||
|
||||
func (s *s3MockServer) Stop() {
|
||||
s.srv.Close()
|
||||
}
|
||||
|
||||
func (s *s3MockServer) HTTPAddress() string {
|
||||
return s.srv.URL
|
||||
}
|
||||
|
||||
func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||
key := path.Base(r.URL.Path)
|
||||
|
||||
switch key {
|
||||
case s.BucketName:
|
||||
w.Header().Add("Content-Type", "application/xml")
|
||||
|
||||
if r.Method == http.MethodHead {
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
|
||||
if q["location"] != nil {
|
||||
fmt.Fprint(w, `
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">Europe</LocationConstraint>
|
||||
`)
|
||||
return
|
||||
}
|
||||
|
||||
contents := ""
|
||||
for _, o := range s.Objects {
|
||||
etag := md5.Sum(o.Content)
|
||||
contents += fmt.Sprintf(`
|
||||
<Contents>
|
||||
<Key>%s</Key>
|
||||
<LastModified>%s</LastModified>
|
||||
<Size>%d</Size>
|
||||
<ETag>"%b"</ETag>
|
||||
<StorageClass>STANDARD</StorageClass>
|
||||
</Contents>`, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, `
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||
<Name>%s</Name>
|
||||
<Prefix/>
|
||||
<Marker/>
|
||||
<KeyCount>%d</KeyCount>
|
||||
<MaxKeys>1000</MaxKeys>
|
||||
<IsTruncated>false</IsTruncated>
|
||||
%s
|
||||
</ListBucketResult>
|
||||
`, s.BucketName, len(s.Objects), contents)
|
||||
default:
|
||||
key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
|
||||
if err != nil {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var found *s3MockObject
|
||||
for _, o := range s.Objects {
|
||||
if key == o.Key {
|
||||
found = o
|
||||
}
|
||||
}
|
||||
if found == nil {
|
||||
w.WriteHeader(404)
|
||||
return
|
||||
}
|
||||
|
||||
etag := md5.Sum(found.Content)
|
||||
lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
|
||||
|
||||
w.Header().Add("Content-Type", found.ContentType)
|
||||
w.Header().Add("Last-Modified", lastModified)
|
||||
w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag))
|
||||
w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
|
||||
|
||||
if r.Method == http.MethodHead {
|
||||
return
|
||||
}
|
||||
|
||||
w.Write(found.Content)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,11 +29,11 @@ import (
|
|||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
||||
"github.com/fluxcd/pkg/runtime/controller"
|
||||
"github.com/fluxcd/pkg/runtime/testenv"
|
||||
"github.com/fluxcd/pkg/testserver"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
// +kubebuilder:scaffold:imports
|
||||
"github.com/fluxcd/pkg/runtime/testenv"
|
||||
)
|
||||
|
||||
// These tests make use of plain Go using Gomega for assertions.
|
||||
|
@ -98,6 +98,15 @@ func TestMain(m *testing.M) {
|
|||
panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
||||
if err := (&BucketReconciler{
|
||||
Client: testEnv,
|
||||
Events: testEventsH,
|
||||
Metrics: testMetricsH,
|
||||
Storage: testStorage,
|
||||
}).SetupWithManager(testEnv); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
fmt.Println("Starting the test environment")
|
||||
if err := testEnv.Start(ctx); err != nil {
|
||||
|
|
31
main.go
31
main.go
|
@ -25,13 +25,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/pkg/runtime/client"
|
||||
helper "github.com/fluxcd/pkg/runtime/controller"
|
||||
"github.com/fluxcd/pkg/runtime/events"
|
||||
"github.com/fluxcd/pkg/runtime/leaderelection"
|
||||
"github.com/fluxcd/pkg/runtime/logger"
|
||||
"github.com/fluxcd/pkg/runtime/pprof"
|
||||
"github.com/fluxcd/pkg/runtime/probes"
|
||||
"github.com/go-logr/logr"
|
||||
flag "github.com/spf13/pflag"
|
||||
"helm.sh/helm/v3/pkg/getter"
|
||||
|
@ -41,6 +34,14 @@ import (
|
|||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
||||
"github.com/fluxcd/pkg/runtime/client"
|
||||
helper "github.com/fluxcd/pkg/runtime/controller"
|
||||
"github.com/fluxcd/pkg/runtime/events"
|
||||
"github.com/fluxcd/pkg/runtime/leaderelection"
|
||||
"github.com/fluxcd/pkg/runtime/logger"
|
||||
"github.com/fluxcd/pkg/runtime/pprof"
|
||||
"github.com/fluxcd/pkg/runtime/probes"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
"github.com/fluxcd/source-controller/controllers"
|
||||
// +kubebuilder:scaffold:imports
|
||||
|
@ -144,14 +145,14 @@ func main() {
|
|||
probes.SetupChecks(mgr, setupLog)
|
||||
pprof.SetupHandlers(mgr, setupLog)
|
||||
|
||||
eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder)
|
||||
metricsH := helper.MustMakeMetrics(mgr)
|
||||
|
||||
if storageAdvAddr == "" {
|
||||
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
|
||||
}
|
||||
storage := mustInitStorage(storagePath, storageAdvAddr, setupLog)
|
||||
|
||||
eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder)
|
||||
metricsH := helper.MustMakeMetrics(mgr)
|
||||
|
||||
if err = (&controllers.GitRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Events: eventsH,
|
||||
|
@ -193,12 +194,10 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
if err = (&controllers.BucketReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Storage: storage,
|
||||
EventRecorder: mgr.GetEventRecorderFor(controllerName),
|
||||
ExternalEventRecorder: eventRecorder,
|
||||
MetricsRecorder: metricsH.MetricsRecorder,
|
||||
Client: mgr.GetClient(),
|
||||
Events: eventsH,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
}); err != nil {
|
||||
|
|
Loading…
Reference in New Issue