Rewrite `GitRepositoryReconciler` to new standards
This commit rewrites the `GitRepositoryReconciler` to new standards, while implementing the newly introduced Condition types, and trying to adhere better to Kubernetes API conventions. More specifically it introduces: - Implementation of more explicit Condition types to highlight abnormalities. - Extensive usage of the `conditions` subpackage from `runtime`. - Better and more conflict-resilient (status)patching of reconciled objects using the `patch` subpackage from runtime. - Proper implementation of kstatus' `Reconciling` and `Stalled` conditions. - First (integration) tests that solely rely on `testenv` and do not use Ginkgo. There are a couple of TODOs marked in-code, these are suggestions for the future and should be non-blocking. In addition to the TODOs, more complex and/or edge-case test scenarios may be added as well. Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
parent
3bda2c931d
commit
29caae3fca
|
@ -19,12 +19,10 @@ package v1beta2
|
|||
import (
|
||||
"time"
|
||||
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/acl"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -202,48 +200,6 @@ const (
|
|||
GitOperationFailedReason string = "GitOperationFailed"
|
||||
)
|
||||
|
||||
// GitRepositoryProgressing resets the conditions of the GitRepository to
|
||||
// metav1.Condition of type meta.ReadyCondition with status 'Unknown' and
|
||||
// meta.ProgressingReason reason and message. It returns the modified
|
||||
// GitRepository.
|
||||
func GitRepositoryProgressing(repository GitRepository) GitRepository {
|
||||
repository.Status.ObservedGeneration = repository.Generation
|
||||
repository.Status.URL = ""
|
||||
repository.Status.Conditions = []metav1.Condition{}
|
||||
conditions.MarkUnknown(&repository, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryReady sets the given Artifact and URL on the GitRepository and
|
||||
// sets the meta.ReadyCondition to 'True', with the given reason and message. It
|
||||
// returns the modified GitRepository.
|
||||
func GitRepositoryReady(repository GitRepository, artifact Artifact, includedArtifacts []*Artifact, url, reason, message string) GitRepository {
|
||||
repository.Status.Artifact = &artifact
|
||||
repository.Status.IncludedArtifacts = includedArtifacts
|
||||
repository.Status.URL = url
|
||||
conditions.MarkTrue(&repository, meta.ReadyCondition, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryNotReady sets the meta.ReadyCondition on the given GitRepository
|
||||
// to 'False', with the given reason and message. It returns the modified
|
||||
// GitRepository.
|
||||
func GitRepositoryNotReady(repository GitRepository, reason, message string) GitRepository {
|
||||
conditions.MarkFalse(&repository, meta.ReadyCondition, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryReadyMessage returns the message of the metav1.Condition of type
|
||||
// meta.ReadyCondition with status 'True' if present, or an empty string.
|
||||
func GitRepositoryReadyMessage(repository GitRepository) string {
|
||||
if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
if c.Status == metav1.ConditionTrue {
|
||||
return c.Message
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetConditions returns the status conditions of the object.
|
||||
func (in GitRepository) GetConditions() []metav1.Condition {
|
||||
return in.Status.Conditions
|
||||
|
|
|
@ -20,18 +20,15 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
securejoin "github.com/cyphar/filepath-securejoin"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
@ -40,14 +37,16 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
helper "github.com/fluxcd/pkg/runtime/controller"
|
||||
"github.com/fluxcd/pkg/runtime/events"
|
||||
"github.com/fluxcd/pkg/runtime/metrics"
|
||||
"github.com/fluxcd/pkg/runtime/patch"
|
||||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
"github.com/fluxcd/source-controller/pkg/git"
|
||||
"github.com/fluxcd/source-controller/pkg/git/strategy"
|
||||
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
||||
)
|
||||
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
|
||||
|
@ -58,12 +57,12 @@ import (
|
|||
// GitRepositoryReconciler reconciles a GitRepository object
|
||||
type GitRepositoryReconciler struct {
|
||||
client.Client
|
||||
requeueDependency time.Duration
|
||||
Scheme *runtime.Scheme
|
||||
kuberecorder.EventRecorder
|
||||
helper.Metrics
|
||||
|
||||
Storage *Storage
|
||||
EventRecorder kuberecorder.EventRecorder
|
||||
ExternalEventRecorder *events.Recorder
|
||||
MetricsRecorder *metrics.Recorder
|
||||
|
||||
requeueDependency time.Duration
|
||||
}
|
||||
|
||||
type GitRepositoryReconcilerOptions struct {
|
||||
|
@ -86,398 +85,503 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o
|
|||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
|
||||
start := time.Now()
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
|
||||
var repository sourcev1.GitRepository
|
||||
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
|
||||
// Fetch the GitRepository
|
||||
obj := &sourcev1.GitRepository{}
|
||||
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
// Record suspended status metric
|
||||
defer r.recordSuspension(ctx, repository)
|
||||
r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
|
||||
|
||||
// Add our finalizer if it does not exist
|
||||
if !controllerutil.ContainsFinalizer(&repository, sourcev1.SourceFinalizer) {
|
||||
patch := client.MergeFrom(repository.DeepCopy())
|
||||
controllerutil.AddFinalizer(&repository, sourcev1.SourceFinalizer)
|
||||
if err := r.Patch(ctx, &repository, patch); err != nil {
|
||||
log.Error(err, "unable to register finalizer")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Examine if the object is under deletion
|
||||
if !repository.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
return r.reconcileDelete(ctx, repository)
|
||||
}
|
||||
|
||||
// Return early if the object is suspended.
|
||||
if repository.Spec.Suspend {
|
||||
// Return early if the object is suspended
|
||||
if obj.Spec.Suspend {
|
||||
log.Info("Reconciliation is suspended for this object")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// check dependencies
|
||||
if len(repository.Spec.Include) > 0 {
|
||||
if err := r.checkDependencies(repository); err != nil {
|
||||
repository = sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error())
|
||||
if err := r.updateStatus(ctx, req, repository.Status); err != nil {
|
||||
log.Error(err, "unable to update status for dependency not ready")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
// we can't rely on exponential backoff because it will prolong the execution too much,
|
||||
// instead we requeue on a fix interval.
|
||||
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
|
||||
log.Info(msg)
|
||||
r.event(ctx, repository, events.EventSeverityInfo, msg)
|
||||
r.recordReadiness(ctx, repository)
|
||||
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
|
||||
}
|
||||
log.Info("All dependencies area ready, proceeding with reconciliation")
|
||||
}
|
||||
|
||||
// record reconciliation duration
|
||||
if r.MetricsRecorder != nil {
|
||||
objRef, err := reference.GetReference(r.Scheme, &repository)
|
||||
// Initialize the patch helper
|
||||
patchHelper, err := patch.NewHelper(obj, r.Client)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
defer r.MetricsRecorder.RecordDuration(*objRef, start)
|
||||
}
|
||||
|
||||
// set initial status
|
||||
if resetRepository, ok := r.resetStatus(repository); ok {
|
||||
repository = resetRepository
|
||||
if err := r.updateStatus(ctx, req, repository.Status); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
r.recordReadiness(ctx, repository)
|
||||
}
|
||||
|
||||
// record the value of the reconciliation request, if any
|
||||
// TODO(hidde): would be better to defer this in combination with
|
||||
// always patching the status sub-resource after a reconciliation.
|
||||
if v, ok := meta.ReconcileAnnotationValue(repository.GetAnnotations()); ok {
|
||||
repository.Status.SetLastHandledReconcileRequest(v)
|
||||
}
|
||||
|
||||
// purge old artifacts from storage
|
||||
if err := r.gc(repository); err != nil {
|
||||
log.Error(err, "unable to purge old artifacts")
|
||||
}
|
||||
|
||||
// reconcile repository by pulling the latest Git commit
|
||||
reconciledRepository, reconcileErr := r.reconcile(ctx, *repository.DeepCopy())
|
||||
|
||||
// update status with the reconciliation result
|
||||
if err := r.updateStatus(ctx, req, reconciledRepository.Status); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
// if reconciliation failed, record the failure and requeue immediately
|
||||
if reconcileErr != nil {
|
||||
r.event(ctx, reconciledRepository, events.EventSeverityError, reconcileErr.Error())
|
||||
r.recordReadiness(ctx, reconciledRepository)
|
||||
return ctrl.Result{Requeue: true}, reconcileErr
|
||||
}
|
||||
|
||||
// emit revision change event
|
||||
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
|
||||
r.event(ctx, reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
|
||||
}
|
||||
r.recordReadiness(ctx, reconciledRepository)
|
||||
|
||||
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
|
||||
time.Since(start).String(),
|
||||
repository.GetInterval().Duration.String(),
|
||||
))
|
||||
|
||||
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) checkDependencies(repository sourcev1.GitRepository) error {
|
||||
for _, d := range repository.Spec.Include {
|
||||
dName := types.NamespacedName{Name: d.GitRepositoryRef.Name, Namespace: repository.Namespace}
|
||||
var gr sourcev1.GitRepository
|
||||
err := r.Get(context.Background(), dName, &gr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get '%s' dependency: %w", dName, err)
|
||||
}
|
||||
|
||||
if len(gr.Status.Conditions) == 0 || gr.Generation != gr.Status.ObservedGeneration {
|
||||
return fmt.Errorf("dependency '%s' is not ready", dName)
|
||||
}
|
||||
|
||||
if !apimeta.IsStatusConditionTrue(gr.Status.Conditions, meta.ReadyCondition) {
|
||||
return fmt.Errorf("dependency '%s' is not ready", dName)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sourcev1.GitRepository) (sourcev1.GitRepository, error) {
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
|
||||
// create tmp dir for the Git clone
|
||||
tmpGit, err := os.MkdirTemp("", repository.Name)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("tmp dir error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
// Always attempt to patch the object and status after each reconciliation
|
||||
defer func() {
|
||||
if err := os.RemoveAll(tmpGit); err != nil {
|
||||
log.Error(err, "failed to remove working directory", "path", tmpGit)
|
||||
// Record the value of the reconciliation request, if any
|
||||
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
|
||||
obj.Status.SetLastHandledReconcileRequest(v)
|
||||
}
|
||||
|
||||
// Summarize the Ready condition based on abnormalities that may have been observed.
|
||||
conditions.SetSummary(obj,
|
||||
meta.ReadyCondition,
|
||||
conditions.WithConditions(
|
||||
sourcev1.IncludeUnavailableCondition,
|
||||
sourcev1.SourceVerifiedCondition,
|
||||
sourcev1.CheckoutFailedCondition,
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
),
|
||||
conditions.WithNegativePolarityConditions(
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
sourcev1.CheckoutFailedCondition,
|
||||
sourcev1.SourceVerifiedCondition,
|
||||
sourcev1.IncludeUnavailableCondition,
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
),
|
||||
)
|
||||
|
||||
// Patch the object, ignoring conflicts on the conditions owned by this controller
|
||||
patchOpts := []patch.Option{
|
||||
patch.WithOwnedConditions{
|
||||
Conditions: []string{
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
sourcev1.CheckoutFailedCondition,
|
||||
sourcev1.IncludeUnavailableCondition,
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
meta.ReadyCondition,
|
||||
meta.ReconcilingCondition,
|
||||
meta.StalledCondition,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
|
||||
if retErr == nil && (result.IsZero() || !result.Requeue) {
|
||||
// We are no longer reconciling
|
||||
conditions.Delete(obj, meta.ReconcilingCondition)
|
||||
|
||||
// We have now observed this generation
|
||||
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
|
||||
|
||||
readyCondition := conditions.Get(obj, meta.ReadyCondition)
|
||||
switch readyCondition.Status {
|
||||
case metav1.ConditionFalse:
|
||||
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
|
||||
conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
|
||||
case metav1.ConditionTrue:
|
||||
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
|
||||
conditions.Delete(obj, meta.StalledCondition)
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, patch the resource
|
||||
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
|
||||
retErr = kerrors.NewAggregate([]error{retErr, err})
|
||||
}
|
||||
|
||||
// Always record readiness and duration metrics
|
||||
r.Metrics.RecordReadiness(ctx, obj)
|
||||
r.Metrics.RecordDuration(ctx, obj, start)
|
||||
}()
|
||||
|
||||
// Configure auth options using secret
|
||||
var authOpts *git.AuthOptions
|
||||
if repository.Spec.SecretRef != nil {
|
||||
// Add finalizer first if not exist to avoid the race condition
|
||||
// between init and delete
|
||||
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
|
||||
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
|
||||
// Examine if the object is under deletion
|
||||
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
return r.reconcileDelete(ctx, obj)
|
||||
}
|
||||
|
||||
// Reconcile actual object
|
||||
return r.reconcile(ctx, obj)
|
||||
}
|
||||
|
||||
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
|
||||
// produces an error.
|
||||
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
|
||||
// Mark the resource as under reconciliation
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
|
||||
|
||||
// Reconcile the storage data
|
||||
if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Create temp dir for Git clone
|
||||
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
|
||||
if err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Reconcile the source from upstream
|
||||
var artifact sourcev1.Artifact
|
||||
if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, err
|
||||
}
|
||||
|
||||
// Reconcile includes from the storage
|
||||
var includes artifactSet
|
||||
if result, err := r.reconcileInclude(ctx, obj, includes, tmpDir); err != nil || result.IsZero() {
|
||||
return ctrl.Result{RequeueAfter: r.requeueDependency}, err
|
||||
}
|
||||
|
||||
// Reconcile the artifact to storage
|
||||
if result, err := r.reconcileArtifact(ctx, obj, artifact, includes, tmpDir); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
|
||||
//
|
||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
||||
// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
|
||||
// If the hostname of any of the URLs on the object do not match the current storage server hostname, they are updated.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
|
||||
// Garbage collect previous advertised artifact(s) from storage
|
||||
_ = r.garbageCollect(ctx, obj)
|
||||
|
||||
// Determine if the advertised artifact is still in storage
|
||||
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
|
||||
obj.Status.Artifact = nil
|
||||
obj.Status.URL = ""
|
||||
}
|
||||
|
||||
// Record that we do not have an artifact
|
||||
if obj.GetArtifact() == nil {
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
|
||||
// Always update URLs to ensure hostname is up-to-date
|
||||
// TODO(hidde): we may want to send out an event only if we notice the URL has changed
|
||||
r.Storage.SetArtifactURL(obj.GetArtifact())
|
||||
obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// reconcileSource ensures the upstream Git repository can be reached and checked out using the declared configuration,
|
||||
// and observes its state.
|
||||
//
|
||||
// The repository is checked out to the given dir using the defined configuration, and in case of an error during the
|
||||
// checkout process (including transient errors), it records v1beta1.CheckoutFailedCondition=True and returns early.
|
||||
// On a successful checkout it removes v1beta1.CheckoutFailedCondition, and compares the current revision of HEAD to the
|
||||
// artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||
// If instructed, the signature of the commit is verified if and recorded as v1beta1.SourceVerifiedCondition. If the
|
||||
// signature can not be verified or the verification fails, the Condition=False and it returns early.
|
||||
// If both the checkout and signature verification are successful, the given artifact pointer is set to a new artifact
|
||||
// with the available metadata.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
|
||||
obj *sourcev1.GitRepository, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
|
||||
// Configure authentication strategy to access the source
|
||||
authOpts := &git.AuthOptions{}
|
||||
if obj.Spec.SecretRef != nil {
|
||||
// Attempt to retrieve secret
|
||||
name := types.NamespacedName{
|
||||
Namespace: repository.GetNamespace(),
|
||||
Name: repository.Spec.SecretRef.Name,
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.Spec.SecretRef.Name,
|
||||
}
|
||||
var secret corev1.Secret
|
||||
if err := r.Client.Get(ctx, name, &secret); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", name.String(), err.Error())
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", name.String(), err.Error())
|
||||
// Return error as the world as observed may change
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
secret := &corev1.Secret{}
|
||||
err = r.Client.Get(ctx, name, secret)
|
||||
// Configure strategy with secret
|
||||
var err error
|
||||
authOpts, err = git.AuthOptionsFromSecret(obj.Spec.URL, &secret)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("auth secret error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.AuthenticationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to configure auth strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to configure auth strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
|
||||
// Return error as the contents of the secret may change
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
authOpts, err = git.AuthOptionsFromSecret(repository.Spec.URL, secret)
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.AuthenticationFailedReason, err.Error()), err
|
||||
}
|
||||
}
|
||||
checkoutOpts := git.CheckoutOptions{RecurseSubmodules: repository.Spec.RecurseSubmodules}
|
||||
if ref := repository.Spec.Reference; ref != nil {
|
||||
// Configure checkout strategy
|
||||
checkoutOpts := git.CheckoutOptions{RecurseSubmodules: obj.Spec.RecurseSubmodules}
|
||||
if ref := obj.Spec.Reference; ref != nil {
|
||||
checkoutOpts.Branch = ref.Branch
|
||||
checkoutOpts.Commit = ref.Commit
|
||||
checkoutOpts.Tag = ref.Tag
|
||||
checkoutOpts.SemVer = ref.SemVer
|
||||
}
|
||||
checkoutStrategy, err := strategy.CheckoutStrategyForImplementation(ctx,
|
||||
git.Implementation(repository.Spec.GitImplementation), checkoutOpts)
|
||||
git.Implementation(obj.Spec.GitImplementation), checkoutOpts)
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.GitOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
|
||||
"Failed to configure checkout strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
|
||||
// Do not return err as recovery without changes is impossible
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
gitCtx, cancel := context.WithTimeout(ctx, repository.Spec.Timeout.Duration)
|
||||
// Checkout HEAD of reference in object
|
||||
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
||||
defer cancel()
|
||||
|
||||
commit, err := checkoutStrategy.Checkout(gitCtx, tmpGit, repository.Spec.URL, authOpts)
|
||||
commit, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, authOpts)
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.GitOperationFailedReason, err.Error()), err
|
||||
conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
|
||||
"Failed to checkout and determine revision: %s", err)
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.GitOperationFailedReason,
|
||||
"Failed to checkout and determine revision: %s", err)
|
||||
// Coin flip on transient or persistent error, return error and hope for the best
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
artifact := r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), commit.String(), fmt.Sprintf("%s.tar.gz", commit.Hash.String()))
|
||||
r.Eventf(obj, events.EventSeverityInfo, sourcev1.GitOperationSucceedReason,
|
||||
"Cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, commit.String())
|
||||
conditions.Delete(obj, sourcev1.CheckoutFailedCondition)
|
||||
|
||||
// copy all included repository into the artifact
|
||||
includedArtifacts := []*sourcev1.Artifact{}
|
||||
for _, incl := range repository.Spec.Include {
|
||||
dName := types.NamespacedName{Name: incl.GitRepositoryRef.Name, Namespace: repository.Namespace}
|
||||
var gr sourcev1.GitRepository
|
||||
err := r.Get(context.Background(), dName, &gr)
|
||||
// Verify commit signature
|
||||
if result, err := r.verifyCommitSignature(ctx, obj, *commit); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Create potential new artifact with current available metadata
|
||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), commit.String(), fmt.Sprintf("%s.tar.gz", commit.Hash.String()))
|
||||
|
||||
// Mark observations about the revision on the object
|
||||
if !obj.GetArtifact().HasRevision(commit.String()) {
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '%s'", commit.String())
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
|
||||
// given data.
|
||||
//
|
||||
// The inspection of the given data to the object is differed, ensuring any stale observations as
|
||||
// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
|
||||
// If the given artifact and/or includes do not differ from the object's current, it returns early.
|
||||
// Source ignore patterns are loaded, and the given directory is archived.
|
||||
// On a successful archive, the artifact and includes in the status of the given object are set, and the symlink in the
|
||||
// storage is updated to its path.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.GitRepository, artifact sourcev1.Artifact, includes artifactSet, dir string) (ctrl.Result, error) {
|
||||
// Always restore the Ready condition in case it got removed due to a transient error
|
||||
defer func() {
|
||||
if obj.GetArtifact() != nil {
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
}
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) && !includes.Diff(obj.Status.IncludedArtifacts) {
|
||||
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
|
||||
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
|
||||
"Stored artifact for revision '%s'", artifact.Revision)
|
||||
}
|
||||
}()
|
||||
|
||||
// The artifact is up-to-date
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) && !includes.Diff(obj.Status.IncludedArtifacts) {
|
||||
ctrl.LoggerFrom(ctx).Info("Artifact is up-to-date")
|
||||
return ctrl.Result{RequeueAfter: obj.GetInterval().Duration}, nil
|
||||
}
|
||||
|
||||
// Ensure target path exists and is a directory
|
||||
if f, err := os.Stat(dir); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
|
||||
return ctrl.Result{}, err
|
||||
} else if !f.IsDir() {
|
||||
ctrl.LoggerFrom(ctx).Error(err, fmt.Sprintf("source path '%s' is not a directory", dir))
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Ensure artifact directory exists and acquire lock
|
||||
if err := r.Storage.MkdirAll(artifact); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
unlock, err := r.Storage.Lock(artifact)
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
includedArtifacts = append(includedArtifacts, gr.GetArtifact())
|
||||
defer unlock()
|
||||
|
||||
// Load ignore rules for archiving
|
||||
ps, err := sourceignore.LoadIgnorePatterns(dir, nil)
|
||||
if err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError,
|
||||
"SourceIgnoreError", "Failed to load source ignore patterns from repository: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if obj.Spec.Ignore != nil {
|
||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
|
||||
}
|
||||
|
||||
// return early on unchanged revision and unchanged included repositories
|
||||
if apimeta.IsStatusConditionTrue(repository.Status.Conditions, meta.ReadyCondition) && repository.GetArtifact().HasRevision(artifact.Revision) && !hasArtifactUpdated(repository.Status.IncludedArtifacts, includedArtifacts) {
|
||||
if artifact.URL != repository.GetArtifact().URL {
|
||||
r.Storage.SetArtifactURL(repository.GetArtifact())
|
||||
repository.Status.URL = r.Storage.SetHostname(repository.Status.URL)
|
||||
// Archive directory to storage
|
||||
if err := r.Storage.Archive(&artifact, dir, SourceIgnoreFilter(ps, nil)); err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
|
||||
"Unable to archive artifact to storage: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return repository, nil
|
||||
r.AnnotatedEventf(obj, map[string]string{
|
||||
"revision": artifact.Revision,
|
||||
"checksum": artifact.Checksum,
|
||||
}, events.EventSeverityInfo, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
|
||||
|
||||
// Record it on the object
|
||||
obj.Status.Artifact = artifact.DeepCopy()
|
||||
obj.Status.IncludedArtifacts = includes
|
||||
|
||||
// Update symlink on a "best effort" basis
|
||||
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
|
||||
if err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
|
||||
"Failed to update status URL symlink: %s", err)
|
||||
}
|
||||
if url != "" {
|
||||
obj.Status.URL = url
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// reconcileInclude reconciles the declared includes from the object by copying their artifact (sub)contents to the
|
||||
// declared paths in the given directory.
|
||||
//
|
||||
// If an include is unavailable, it marks the object with v1beta1.IncludeUnavailableCondition and returns early.
|
||||
// If the copy operations are successful, it deletes the v1beta1.IncludeUnavailableCondition from the object.
|
||||
// If the artifactSet differs from the current set, it marks the object with v1beta1.ArtifactOutdatedCondition.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *GitRepositoryReconciler) reconcileInclude(ctx context.Context, obj *sourcev1.GitRepository, artifacts artifactSet, dir string) (ctrl.Result, error) {
|
||||
artifacts = make(artifactSet, len(obj.Spec.Include))
|
||||
for i, incl := range obj.Spec.Include {
|
||||
// Do this first as it is much cheaper than copy operations
|
||||
toPath, err := securejoin.SecureJoin(dir, incl.GetToPath())
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "IllegalPath",
|
||||
"Path calculation for include %q failed: %s", incl.GitRepositoryRef.Name, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// verify PGP signature
|
||||
if repository.Spec.Verification != nil {
|
||||
// Retrieve the included GitRepository
|
||||
dep := &sourcev1.GitRepository{}
|
||||
if err := r.Get(ctx, types.NamespacedName{Namespace: obj.Namespace, Name: incl.GitRepositoryRef.Name}, dep); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "NotFound",
|
||||
"Could not get resource for include %q: %s", incl.GitRepositoryRef.Name, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Confirm include has an artifact
|
||||
if dep.GetArtifact() == nil {
|
||||
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "NoArtifact",
|
||||
"No artifact available for include %q", incl.GitRepositoryRef.Name)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// Copy artifact (sub)contents to configured directory
|
||||
if err := r.Storage.CopyToPath(dep.GetArtifact(), incl.GetFromPath(), toPath); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "CopyFailure",
|
||||
"Failed to copy %q include from %s to %s: %s", incl.GitRepositoryRef.Name, incl.GetFromPath(), incl.GetToPath(), err.Error())
|
||||
r.Eventf(obj, events.EventSeverityError, sourcev1.IncludeUnavailableCondition,
|
||||
"Failed to copy %q include from %s to %s: %s", incl.GitRepositoryRef.Name, incl.GetFromPath(), incl.GetToPath(), err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
artifacts[i] = dep.GetArtifact().DeepCopy()
|
||||
}
|
||||
|
||||
// We now know all includes are available
|
||||
conditions.Delete(obj, sourcev1.IncludeUnavailableCondition)
|
||||
|
||||
// Observe if the artifacts still match the previous included ones
|
||||
if artifacts.Diff(obj.Status.IncludedArtifacts) {
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "IncludeChange", "Included artifacts differ from last observed includes")
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// reconcileDelete handles the delete of an object. It first garbage collects all artifacts for the object from the
|
||||
// artifact storage, if successful, the finalizer is removed from the object.
|
||||
func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
|
||||
// Garbage collect the resource's artifacts
|
||||
if err := r.garbageCollect(ctx, obj); err != nil {
|
||||
// Return the error so we retry the failed garbage collection
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Remove our finalizer from the list
|
||||
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
|
||||
// Stop reconciliation as the object is being deleted
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// verifyCommitSignature verifies the signature of the given commit if a verification mode is configured on the object.
|
||||
func (r *GitRepositoryReconciler) verifyCommitSignature(ctx context.Context, obj *sourcev1.GitRepository, commit git.Commit) (ctrl.Result, error) {
|
||||
// Check if there is a commit verification is configured and remove any old observations if there is none
|
||||
if obj.Spec.Verification == nil || obj.Spec.Verification.Mode == "" {
|
||||
conditions.Delete(obj, sourcev1.SourceVerifiedCondition)
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// Get secret with GPG data
|
||||
publicKeySecret := types.NamespacedName{
|
||||
Namespace: repository.Namespace,
|
||||
Name: repository.Spec.Verification.SecretRef.Name,
|
||||
Namespace: obj.Namespace,
|
||||
Name: obj.Spec.Verification.SecretRef.Name,
|
||||
}
|
||||
secret := &corev1.Secret{}
|
||||
if err := r.Client.Get(ctx, publicKeySecret, secret); err != nil {
|
||||
err = fmt.Errorf("PGP public keys secret error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.VerificationFailedReason, err.Error()), err
|
||||
conditions.MarkFalse(obj, sourcev1.SourceVerifiedCondition, meta.FailedReason, "PGP public keys secret error: %s", err.Error())
|
||||
r.Eventf(obj, events.EventSeverityError, "VerificationError", "PGP public keys secret error: %s", err.Error())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
var keyRings []string
|
||||
for _, v := range secret.Data {
|
||||
keyRings = append(keyRings, string(v))
|
||||
}
|
||||
if _, err = commit.Verify(keyRings...); err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.VerificationFailedReason, err.Error()), err
|
||||
}
|
||||
}
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("mkdir dir error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
for i, incl := range repository.Spec.Include {
|
||||
toPath, err := securejoin.SecureJoin(tmpGit, incl.GetToPath())
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
|
||||
}
|
||||
err = r.Storage.CopyToPath(includedArtifacts[i], incl.GetFromPath(), toPath)
|
||||
if err != nil {
|
||||
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
|
||||
}
|
||||
}
|
||||
|
||||
// acquire lock
|
||||
unlock, err := r.Storage.Lock(artifact)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to acquire lock: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
// archive artifact and check integrity
|
||||
ignoreDomain := strings.Split(tmpGit, string(filepath.Separator))
|
||||
ps, err := sourceignore.LoadIgnorePatterns(tmpGit, ignoreDomain)
|
||||
if err != nil {
|
||||
err = fmt.Errorf(".sourceignore error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
if repository.Spec.Ignore != nil {
|
||||
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*repository.Spec.Ignore), ignoreDomain)...)
|
||||
}
|
||||
if err := r.Storage.Archive(&artifact, tmpGit, SourceIgnoreFilter(ps, ignoreDomain)); err != nil {
|
||||
err = fmt.Errorf("storage archive error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
// update latest symlink
|
||||
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
|
||||
if err != nil {
|
||||
err = fmt.Errorf("storage symlink error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
|
||||
return sourcev1.GitRepositoryReady(repository, artifact, includedArtifacts, url, sourcev1.GitOperationSucceedReason, message), nil
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, repository sourcev1.GitRepository) (ctrl.Result, error) {
|
||||
if err := r.gc(repository); err != nil {
|
||||
r.event(ctx, repository, events.EventSeverityError,
|
||||
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
|
||||
// Return the error so we retry the failed garbage collection
|
||||
// Verify commit with GPG data from secret
|
||||
if _, err := commit.Verify(keyRings...); err != nil {
|
||||
conditions.MarkFalse(obj, sourcev1.SourceVerifiedCondition, meta.FailedReason, "Signature verification of commit %q failed: %s", commit.Hash.String(), err)
|
||||
r.Eventf(obj, events.EventSeverityError, "InvalidCommitSignature", "Signature verification of commit %q failed: %s", commit.Hash.String(), err)
|
||||
// Return error in the hope the secret changes
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Record deleted status
|
||||
r.recordReadiness(ctx, repository)
|
||||
|
||||
// Remove our finalizer from the list and update it
|
||||
controllerutil.RemoveFinalizer(&repository, sourcev1.SourceFinalizer)
|
||||
if err := r.Update(ctx, &repository); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Stop reconciliation as the object is being deleted
|
||||
return ctrl.Result{}, nil
|
||||
conditions.MarkTrue(obj, sourcev1.SourceVerifiedCondition, meta.SucceededReason, "Verified signature of commit %q", commit.Hash.String())
|
||||
r.Eventf(obj, events.EventSeverityInfo, "VerifiedCommit", "Verified signature of commit %q", commit.Hash.String())
|
||||
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
|
||||
}
|
||||
|
||||
// resetStatus returns a modified v1beta1.GitRepository and a boolean indicating
|
||||
// if the status field has been reset.
|
||||
func (r *GitRepositoryReconciler) resetStatus(repository sourcev1.GitRepository) (sourcev1.GitRepository, bool) {
|
||||
// We do not have an artifact, or it does no longer exist
|
||||
if repository.GetArtifact() == nil || !r.Storage.ArtifactExist(*repository.GetArtifact()) {
|
||||
repository = sourcev1.GitRepositoryProgressing(repository)
|
||||
repository.Status.Artifact = nil
|
||||
return repository, true
|
||||
// garbageCollect performs a garbage collection for the given v1beta1.GitRepository. It removes all but the current
|
||||
// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
|
||||
// resource.
|
||||
func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourcev1.GitRepository) error {
|
||||
if !obj.DeletionTimestamp.IsZero() {
|
||||
if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError, "GarbageCollectionFailed",
|
||||
"Garbage collection for deleted resource failed: %s", err)
|
||||
return err
|
||||
}
|
||||
if repository.Generation != repository.Status.ObservedGeneration {
|
||||
return sourcev1.GitRepositoryProgressing(repository), true
|
||||
obj.Status.Artifact = nil
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(obj, events.EventSeverityInfo, "GarbageCollectionSucceeded",
|
||||
"Garbage collected artifacts for deleted resource")
|
||||
return nil
|
||||
}
|
||||
return repository, false
|
||||
}
|
||||
|
||||
// gc performs a garbage collection for the given v1beta1.GitRepository.
|
||||
// It removes all but the current artifact except for when the
|
||||
// deletion timestamp is set, which will result in the removal of
|
||||
// all artifacts for the resource.
|
||||
func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) error {
|
||||
if !repository.DeletionTimestamp.IsZero() {
|
||||
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), "", "*"))
|
||||
if obj.GetArtifact() != nil {
|
||||
if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
|
||||
r.Eventf(obj, events.EventSeverityError, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
|
||||
return err
|
||||
}
|
||||
if repository.GetArtifact() != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*repository.GetArtifact())
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// event emits a Kubernetes event and forwards the event to notification controller if configured
|
||||
func (r *GitRepositoryReconciler) event(ctx context.Context, repository sourcev1.GitRepository, severity, msg string) {
|
||||
if r.EventRecorder != nil {
|
||||
r.EventRecorder.Eventf(&repository, corev1.EventTypeNormal, severity, msg)
|
||||
}
|
||||
if r.ExternalEventRecorder != nil {
|
||||
r.ExternalEventRecorder.Eventf(&repository, corev1.EventTypeNormal, severity, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) recordReadiness(ctx context.Context, repository sourcev1.GitRepository) {
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
if r.MetricsRecorder == nil {
|
||||
return
|
||||
}
|
||||
objRef, err := reference.GetReference(r.Scheme, &repository)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to record readiness metric")
|
||||
return
|
||||
}
|
||||
if rc := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
|
||||
r.MetricsRecorder.RecordCondition(*objRef, *rc, !repository.DeletionTimestamp.IsZero())
|
||||
} else {
|
||||
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
|
||||
Type: meta.ReadyCondition,
|
||||
Status: metav1.ConditionUnknown,
|
||||
}, !repository.DeletionTimestamp.IsZero())
|
||||
}
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) recordSuspension(ctx context.Context, gitrepository sourcev1.GitRepository) {
|
||||
if r.MetricsRecorder == nil {
|
||||
return
|
||||
}
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
|
||||
objRef, err := reference.GetReference(r.Scheme, &gitrepository)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to record suspended metric")
|
||||
return
|
||||
}
|
||||
|
||||
if !gitrepository.DeletionTimestamp.IsZero() {
|
||||
r.MetricsRecorder.RecordSuspend(*objRef, false)
|
||||
} else {
|
||||
r.MetricsRecorder.RecordSuspend(*objRef, gitrepository.Spec.Suspend)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *GitRepositoryReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.GitRepositoryStatus) error {
|
||||
var repository sourcev1.GitRepository
|
||||
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
patch := client.MergeFrom(repository.DeepCopy())
|
||||
repository.Status = newStatus
|
||||
|
||||
return r.Status().Patch(ctx, &repository, patch)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -30,6 +30,7 @@ import (
|
|||
"helm.sh/helm/v3/pkg/getter"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/record"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/envtest"
|
||||
|
@ -119,10 +120,10 @@ var _ = BeforeSuite(func() {
|
|||
|
||||
err = (&GitRepositoryReconciler{
|
||||
Client: k8sManager.GetClient(),
|
||||
Scheme: scheme.Scheme,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
Storage: ginkgoTestStorage,
|
||||
}).SetupWithManager(k8sManager)
|
||||
Expect(err).ToNot(HaveOccurred(), "failed to setup GtRepositoryReconciler")
|
||||
Expect(err).ToNot(HaveOccurred(), "failed to setup GitRepositoryReconciler")
|
||||
|
||||
err = (&HelmRepositoryReconciler{
|
||||
Client: k8sManager.GetClient(),
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/record"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
||||
"github.com/fluxcd/pkg/runtime/controller"
|
||||
|
@ -87,13 +88,14 @@ func TestMain(m *testing.M) {
|
|||
|
||||
testMetricsH = controller.MustMakeMetrics(testEnv)
|
||||
|
||||
//if err := (&GitRepositoryReconciler{
|
||||
// Client: testEnv,
|
||||
// Metrics: testMetricsH,
|
||||
// Storage: testStorage,
|
||||
//}).SetupWithManager(testEnv); err != nil {
|
||||
// panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
|
||||
//}
|
||||
if err := (&GitRepositoryReconciler{
|
||||
Client: testEnv,
|
||||
EventRecorder: record.NewFakeRecorder(32),
|
||||
Metrics: testMetricsH,
|
||||
Storage: testStorage,
|
||||
}).SetupWithManager(testEnv); err != nil {
|
||||
panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
|
||||
}
|
||||
|
||||
go func() {
|
||||
fmt.Println("Starting the test environment")
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
**.txt
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: dummy
|
2
go.mod
2
go.mod
|
@ -38,6 +38,7 @@ require (
|
|||
k8s.io/api v0.23.1
|
||||
k8s.io/apimachinery v0.23.1
|
||||
k8s.io/client-go v0.23.1
|
||||
k8s.io/utils v0.0.0-20211208161948-7d6a63dca704
|
||||
sigs.k8s.io/controller-runtime v0.11.0
|
||||
sigs.k8s.io/yaml v1.3.0
|
||||
)
|
||||
|
@ -200,7 +201,6 @@ require (
|
|||
k8s.io/klog/v2 v2.30.0 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
|
||||
k8s.io/kubectl v0.22.4 // indirect
|
||||
k8s.io/utils v0.0.0-20211208161948-7d6a63dca704 // indirect
|
||||
oras.land/oras-go v0.4.0 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
|
||||
sigs.k8s.io/kustomize/api v0.10.1 // indirect
|
||||
|
|
5
main.go
5
main.go
|
@ -166,10 +166,9 @@ func main() {
|
|||
|
||||
if err = (&controllers.GitRepositoryReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
Storage: storage,
|
||||
EventRecorder: eventRecorder,
|
||||
MetricsRecorder: metricsH.MetricsRecorder,
|
||||
Metrics: metricsH,
|
||||
Storage: storage,
|
||||
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
|
||||
MaxConcurrentReconciles: concurrent,
|
||||
DependencyRequeueInterval: requeueDependency,
|
||||
|
|
Loading…
Reference in New Issue