Rewrite `GitRepositoryReconciler` to new standards

This commit rewrites the `GitRepositoryReconciler` to new standards,
while implementing the newly introduced Condition types, and trying
to adhere better to Kubernetes API conventions.

More specifically it introduces:

- Implementation of more explicit Condition types to highlight
  abnormalities.
- Extensive usage of the `conditions` subpackage from `runtime`.
- Better and more conflict-resilient (status)patching of reconciled
  objects using the `patch` subpackage from runtime.
- Proper implementation of kstatus' `Reconciling` and `Stalled`
  conditions.
- First (integration) tests that solely rely on `testenv` and do not
  use Ginkgo.

There are a couple of TODOs marked in-code, these are suggestions for
the future and should be non-blocking.
In addition to the TODOs, more complex and/or edge-case test scenarios
may be added as well.

Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
Hidde Beydals 2021-07-30 14:19:39 +02:00
parent 912e59da1f
commit 08ce0c95fc
11 changed files with 1524 additions and 1109 deletions

View File

@ -18,8 +18,6 @@ package v1beta1
import ( import (
"github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
@ -203,48 +201,6 @@ const (
GitOperationFailedReason string = "GitOperationFailed" GitOperationFailedReason string = "GitOperationFailed"
) )
// GitRepositoryProgressing resets the conditions of the GitRepository to
// metav1.Condition of type meta.ReadyCondition with status 'Unknown' and
// meta.ProgressingReason reason and message. It returns the modified
// GitRepository.
func GitRepositoryProgressing(repository GitRepository) GitRepository {
repository.Status.ObservedGeneration = repository.Generation
repository.Status.URL = ""
repository.Status.Conditions = []metav1.Condition{}
conditions.MarkUnknown(&repository, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
return repository
}
// GitRepositoryReady sets the given Artifact and URL on the GitRepository and
// sets the meta.ReadyCondition to 'True', with the given reason and message. It
// returns the modified GitRepository.
func GitRepositoryReady(repository GitRepository, artifact Artifact, includedArtifacts []*Artifact, url, reason, message string) GitRepository {
repository.Status.Artifact = &artifact
repository.Status.IncludedArtifacts = includedArtifacts
repository.Status.URL = url
conditions.MarkTrue(&repository, meta.ReadyCondition, reason, message)
return repository
}
// GitRepositoryNotReady sets the meta.ReadyCondition on the given GitRepository
// to 'False', with the given reason and message. It returns the modified
// GitRepository.
func GitRepositoryNotReady(repository GitRepository, reason, message string) GitRepository {
conditions.MarkFalse(&repository, meta.ReadyCondition, reason, message)
return repository
}
// GitRepositoryReadyMessage returns the message of the metav1.Condition of type
// meta.ReadyCondition with status 'True' if present, or an empty string.
func GitRepositoryReadyMessage(repository GitRepository) string {
if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil {
if c.Status == metav1.ConditionTrue {
return c.Message
}
}
return ""
}
// GetConditions returns the status conditions of the object. // GetConditions returns the status conditions of the object.
func (in GitRepository) GetConditions() []metav1.Condition { func (in GitRepository) GetConditions() []metav1.Condition {
return in.Status.Conditions return in.Status.Conditions

View File

@ -20,19 +20,14 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings" "strings"
"time" "time"
securejoin "github.com/cyphar/filepath-securejoin" securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
kuberecorder "k8s.io/client-go/tools/record" kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
@ -41,14 +36,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/source-controller/pkg/sourceignore"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/pkg/git" "github.com/fluxcd/source-controller/pkg/git"
"github.com/fluxcd/source-controller/pkg/git/strategy" "github.com/fluxcd/source-controller/pkg/git/strategy"
"github.com/fluxcd/source-controller/pkg/sourceignore"
) )
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -59,12 +56,12 @@ import (
// GitRepositoryReconciler reconciles a GitRepository object // GitRepositoryReconciler reconciles a GitRepository object
type GitRepositoryReconciler struct { type GitRepositoryReconciler struct {
client.Client client.Client
requeueDependency time.Duration helper.Events
Scheme *runtime.Scheme helper.Metrics
Storage *Storage Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder requeueDependency time.Duration
MetricsRecorder *metrics.Recorder
} }
type GitRepositoryReconcilerOptions struct { type GitRepositoryReconcilerOptions struct {
@ -87,410 +84,505 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o
Complete(r) Complete(r)
} }
func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now() start := time.Now()
log := logr.FromContext(ctx) log := ctrl.LoggerFrom(ctx)
var repository sourcev1.GitRepository // Fetch the GitRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil { obj := &sourcev1.GitRepository{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err) return ctrl.Result{}, client.IgnoreNotFound(err)
} }
// Record suspended status metric // Record suspended status metric
defer r.recordSuspension(ctx, repository) r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
// Add our finalizer if it does not exist // Return early if the object is suspended
if !controllerutil.ContainsFinalizer(&repository, sourcev1.SourceFinalizer) { if obj.Spec.Suspend {
controllerutil.AddFinalizer(&repository, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
log.Error(err, "unable to register finalizer")
return ctrl.Result{}, err
}
}
// Examine if the object is under deletion
if !repository.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, repository)
}
// Return early if the object is suspended.
if repository.Spec.Suspend {
log.Info("Reconciliation is suspended for this object") log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
// check dependencies // Initialize the patch helper
if len(repository.Spec.Include) > 0 { patchHelper, err := patch.NewHelper(obj, r.Client)
if err := r.checkDependencies(repository); err != nil {
repository = sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error())
if err := r.updateStatus(ctx, req, repository.Status); err != nil {
log.Error(err, "unable to update status for dependency not ready")
return ctrl.Result{Requeue: true}, err
}
// we can't rely on exponential backoff because it will prolong the execution too much,
// instead we requeue on a fix interval.
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
log.Info(msg)
r.event(ctx, repository, events.EventSeverityInfo, msg)
r.recordReadiness(ctx, repository)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
log.Info("All dependencies area ready, proceeding with reconciliation")
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil { if err != nil {
return ctrl.Result{}, err return ctrl.Result{}, err
} }
defer r.MetricsRecorder.RecordDuration(*objRef, start)
// Always attempt to patch the object and status after each reconciliation
defer func() {
// Record the value of the reconciliation request, if any
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
} }
// set initial status // Summarize the Ready condition based on abnormalities that may have been observed.
if resetRepository, ok := r.resetStatus(repository); ok { conditions.SetSummary(obj,
repository = resetRepository meta.ReadyCondition,
if err := r.updateStatus(ctx, req, repository.Status); err != nil { conditions.WithConditions(
log.Error(err, "unable to update status") sourcev1.IncludeUnavailableCondition,
return ctrl.Result{Requeue: true}, err sourcev1.SourceVerifiedCondition,
} sourcev1.CheckoutFailedCondition,
r.recordReadiness(ctx, repository) sourcev1.ArtifactOutdatedCondition,
sourcev1.ArtifactUnavailableCondition,
),
conditions.WithNegativePolarityConditions(
sourcev1.ArtifactUnavailableCondition,
sourcev1.CheckoutFailedCondition,
sourcev1.SourceVerifiedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
),
)
// Patch the object, ignoring conflicts on the conditions owned by this controller
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: []string{
sourcev1.ArtifactUnavailableCondition,
sourcev1.CheckoutFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
},
} }
// record the value of the reconciliation request, if any // Determine if the resource is still being reconciled, or if it has stalled, and record this observation
// TODO(hidde): would be better to defer this in combination with if retErr == nil && (result.IsZero() || !result.Requeue) {
// always patching the status sub-resource after a reconciliation. // We are no longer reconciling
if v, ok := meta.ReconcileAnnotationValue(repository.GetAnnotations()); ok { conditions.Delete(obj, meta.ReconcilingCondition)
repository.Status.SetLastHandledReconcileRequest(v)
// We have now observed this generation
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
readyCondition := conditions.Get(obj, meta.ReadyCondition)
switch readyCondition.Status {
case metav1.ConditionFalse:
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
case metav1.ConditionTrue:
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
conditions.Delete(obj, meta.StalledCondition)
}
} }
// purge old artifacts from storage // Finally, patch the resource
if err := r.gc(repository); err != nil { if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
log.Error(err, "unable to purge old artifacts") retErr = kerrors.NewAggregate([]error{retErr, err})
} }
// reconcile repository by pulling the latest Git commit // Always record readiness and duration metrics
reconciledRepository, reconcileErr := r.reconcile(ctx, *repository.DeepCopy()) r.Metrics.RecordReadiness(ctx, obj)
r.Metrics.RecordDuration(ctx, obj, start)
}()
// update status with the reconciliation result // Add finalizer first if not exist to avoid the race condition
if err := r.updateStatus(ctx, req, reconciledRepository.Status); err != nil { // between init and delete
log.Error(err, "unable to update status") if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
return ctrl.Result{Requeue: true}, err controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
return ctrl.Result{Requeue: true}, nil
} }
// if reconciliation failed, record the failure and requeue immediately // Examine if the object is under deletion
if reconcileErr != nil { if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
r.event(ctx, reconciledRepository, events.EventSeverityError, reconcileErr.Error()) return r.reconcileDelete(ctx, obj)
r.recordReadiness(ctx, reconciledRepository)
return ctrl.Result{Requeue: true}, reconcileErr
} }
// emit revision change event // Reconcile actual object
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision { return r.reconcile(ctx, obj)
r.event(ctx, reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(ctx, reconciledRepository)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
repository.GetInterval().Duration.String(),
))
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
} }
func (r *GitRepositoryReconciler) checkDependencies(repository sourcev1.GitRepository) error { // reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
for _, d := range repository.Spec.Include { // produces an error.
dName := types.NamespacedName{Name: d.GitRepositoryRef.Name, Namespace: repository.Namespace} func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
var gr sourcev1.GitRepository // Mark the resource as under reconciliation
err := r.Get(context.Background(), dName, &gr) conditions.MarkReconciling(obj, meta.ProgressingReason, "")
// Reconcile the storage data
if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
return result, err
}
// Create temp dir for Git clone
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
if err != nil { if err != nil {
return fmt.Errorf("unable to get '%s' dependency: %w", dName, err) r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
return ctrl.Result{}, err
}
defer os.RemoveAll(tmpDir)
// Reconcile the source from upstream
var artifact sourcev1.Artifact
if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, err
} }
if len(gr.Status.Conditions) == 0 || gr.Generation != gr.Status.ObservedGeneration { // Reconcile includes from the storage
return fmt.Errorf("dependency '%s' is not ready", dName) var includes artifactSet
if result, err := r.reconcileInclude(ctx, obj, includes, tmpDir); err != nil || result.IsZero() {
return ctrl.Result{RequeueAfter: r.requeueDependency}, err
} }
if !apimeta.IsStatusConditionTrue(gr.Status.Conditions, meta.ReadyCondition) { // Reconcile the artifact to storage
return fmt.Errorf("dependency '%s' is not ready", dName) if result, err := r.reconcileArtifact(ctx, obj, artifact, includes, tmpDir); err != nil || result.IsZero() {
} return result, err
} }
return nil return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
} }
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sourcev1.GitRepository) (sourcev1.GitRepository, error) { // reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
// create tmp dir for the Git clone //
tmpGit, err := os.MkdirTemp("", repository.Name) // All artifacts for the resource except for the current one are garbage collected from the storage.
if err != nil { // If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
err = fmt.Errorf("tmp dir error: %w", err) // If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err // If the hostname of any of the URLs on the object do not match the current storage server hostname, they are updated.
} //
defer os.RemoveAll(tmpGit) // The caller should assume a failure if an error is returned, or the Result is zero.
func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
// determine auth method // Determine if the advertised artifact is still in storage
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
obj.Status.Artifact = nil
obj.Status.URL = ""
}
// Record that we do not have an artifact
if obj.GetArtifact() == nil {
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
return ctrl.Result{Requeue: true}, nil
}
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
// Always update URLs to ensure hostname is up-to-date
// TODO(hidde): we may want to send out an event only if we notice the URL has changed
r.Storage.SetArtifactURL(obj.GetArtifact())
obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
}
// reconcileSource ensures the upstream Git repository can be reached and checked out using the declared configuration,
// and observes its state.
//
// The repository is checked out to the given dir using the defined configuration, and in case of an error during the
// checkout process (including transient errors), it records v1beta1.CheckoutFailedCondition=True and returns early.
// On a successful checkout it removes v1beta1.CheckoutFailedCondition, and compares the current revision of HEAD to the
// artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
// If instructed, the signature of the commit is verified if and recorded as v1beta1.SourceVerifiedCondition. If the
// signature can not be verified or the verification fails, the Condition=False and it returns early.
// If both the checkout and signature verification are successful, the given artifact pointer is set to a new artifact
// with the available metadata.
//
// The caller should assume a failure if an error is returned, or the Result is zero.
func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
obj *sourcev1.GitRepository, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
// Configure authentication strategy to access the source
auth := &git.Auth{} auth := &git.Auth{}
if repository.Spec.SecretRef != nil { if obj.Spec.SecretRef != nil {
authStrategy, err := strategy.AuthSecretStrategyForURL( // Determine the auth strategy
repository.Spec.URL, authStrategy, err := strategy.AuthSecretStrategyForURL(obj.Spec.URL, git.CheckoutOptions{
git.CheckoutOptions{ GitImplementation: obj.Spec.GitImplementation,
GitImplementation: repository.Spec.GitImplementation, RecurseSubmodules: obj.Spec.RecurseSubmodules,
RecurseSubmodules: repository.Spec.RecurseSubmodules,
}) })
if err != nil { if err != nil {
return sourcev1.GitRepositoryNotReady(repository, sourcev1.AuthenticationFailedReason, err.Error()), err conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
"Failed to get auth strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
// Do not return error as recovery without changes is impossible
return ctrl.Result{}, nil
} }
// Attempt to retrieve secret
name := types.NamespacedName{ name := types.NamespacedName{
Namespace: repository.GetNamespace(), Namespace: obj.GetNamespace(),
Name: repository.Spec.SecretRef.Name, Name: obj.Spec.SecretRef.Name,
} }
var secret corev1.Secret var secret corev1.Secret
err = r.Client.Get(ctx, name, &secret) if err = r.Client.Get(ctx, name, &secret); err != nil {
if err != nil { conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
err = fmt.Errorf("auth secret error: %w", err) "Failed to get secret '%s': %s", name.String(), err.Error())
return sourcev1.GitRepositoryNotReady(repository, sourcev1.AuthenticationFailedReason, err.Error()), err r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
"Failed to get secret '%s': %s", name.String(), err.Error())
// Return error as the world as observed may change
return ctrl.Result{}, err
} }
// Configure strategy with secret
auth, err = authStrategy.Method(secret) auth, err = authStrategy.Method(secret)
if err != nil { if err != nil {
err = fmt.Errorf("auth error: %w", err) conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
return sourcev1.GitRepositoryNotReady(repository, sourcev1.AuthenticationFailedReason, err.Error()), err "Failed to configure auth strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
"Failed to configure auth strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
// Return error as the contents of the secret may change
return ctrl.Result{}, err
} }
} }
checkoutStrategy, err := strategy.CheckoutStrategyForRef( // Configure checkout strategy
repository.Spec.Reference, checkoutStrategy, err := strategy.CheckoutStrategyForRef(obj.Spec.Reference, git.CheckoutOptions{
git.CheckoutOptions{ GitImplementation: obj.Spec.GitImplementation,
GitImplementation: repository.Spec.GitImplementation, RecurseSubmodules: obj.Spec.RecurseSubmodules,
RecurseSubmodules: repository.Spec.RecurseSubmodules, })
},
)
if err != nil { if err != nil {
return sourcev1.GitRepositoryNotReady(repository, sourcev1.GitOperationFailedReason, err.Error()), err conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
"Failed to configure checkout strategy for Git implementation %q: %s", obj.Spec.GitImplementation, err)
// Do not return err as recovery without changes is impossible
return ctrl.Result{}, nil
} }
gitCtx, cancel := context.WithTimeout(ctx, repository.Spec.Timeout.Duration) // Checkout HEAD of reference in object
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel() defer cancel()
commit, revision, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, auth)
commit, revision, err := checkoutStrategy.Checkout(gitCtx, tmpGit, repository.Spec.URL, auth)
if err != nil { if err != nil {
return sourcev1.GitRepositoryNotReady(repository, sourcev1.GitOperationFailedReason, err.Error()), err conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
"Failed to checkout and determine revision: %s", err)
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.GitOperationFailedReason,
"Failed to checkout and determine revision: %s", err)
// Coin flip on transient or persistent error, return error and hope for the best
return ctrl.Result{}, err
}
r.Eventf(ctx, obj, events.EventSeverityInfo, sourcev1.GitOperationSucceedReason,
"Cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, revision)
conditions.Delete(obj, sourcev1.CheckoutFailedCondition)
// Verify commit signature
if result, err := r.verifyCommitSignature(ctx, obj, commit); err != nil || result.IsZero() {
return result, err
} }
artifact := r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", commit.Hash())) // Create potential new artifact with current available metadata
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", commit.Hash()))
// copy all included repository into the artifact // Mark observations about the revision on the object
includedArtifacts := []*sourcev1.Artifact{} if !obj.GetArtifact().HasRevision(revision) {
for _, incl := range repository.Spec.Include { conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '%s'", revision)
dName := types.NamespacedName{Name: incl.GitRepositoryRef.Name, Namespace: repository.Namespace}
var gr sourcev1.GitRepository
err := r.Get(context.Background(), dName, &gr)
if err != nil {
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
} }
includedArtifacts = append(includedArtifacts, gr.GetArtifact()) return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
}
// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
// given data.
//
// The inspection of the given data to the object is differed, ensuring any stale observations as
// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
// If the given artifact and/or includes do not differ from the object's current, it returns early.
// Source ignore patterns are loaded, and the given directory is archived.
// On a successful archive, the artifact and includes in the status of the given object are set, and the symlink in the
// storage is updated to its path.
//
// The caller should assume a failure if an error is returned, or the Result is zero.
func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.GitRepository, artifact sourcev1.Artifact, includes artifactSet, dir string) (ctrl.Result, error) {
// Always restore the Ready condition in case it got removed due to a transient error
defer func() {
if obj.GetArtifact() != nil {
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
}
if obj.GetArtifact().HasRevision(artifact.Revision) && !includes.Diff(obj.Status.IncludedArtifacts) {
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
"Stored artifact for revision '%s'", artifact.Revision)
}
}()
// The artifact is up-to-date
if obj.GetArtifact().HasRevision(artifact.Revision) && !includes.Diff(obj.Status.IncludedArtifacts) {
ctrl.LoggerFrom(ctx).Info("Artifact is up-to-date")
return ctrl.Result{RequeueAfter: obj.GetInterval().Duration}, nil
} }
// return early on unchanged revision and unchanged included repositories // Ensure target path exists and is a directory
if apimeta.IsStatusConditionTrue(repository.Status.Conditions, meta.ReadyCondition) && repository.GetArtifact().HasRevision(artifact.Revision) && !hasArtifactUpdated(repository.Status.IncludedArtifacts, includedArtifacts) { if f, err := os.Stat(dir); err != nil {
if artifact.URL != repository.GetArtifact().URL { ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
r.Storage.SetArtifactURL(repository.GetArtifact()) return ctrl.Result{}, err
repository.Status.URL = r.Storage.SetHostname(repository.Status.URL) } else if !f.IsDir() {
} ctrl.LoggerFrom(ctx).Error(err, fmt.Sprintf("source path '%s' is not a directory", dir))
return repository, nil return ctrl.Result{}, err
} }
// verify PGP signature // Ensure artifact directory exists and acquire lock
if repository.Spec.Verification != nil { if err := r.Storage.MkdirAll(artifact); err != nil {
publicKeySecret := types.NamespacedName{ ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
Namespace: repository.Namespace, return ctrl.Result{}, err
Name: repository.Spec.Verification.SecretRef.Name,
} }
var secret corev1.Secret
if err := r.Client.Get(ctx, publicKeySecret, &secret); err != nil {
err = fmt.Errorf("PGP public keys secret error: %w", err)
return sourcev1.GitRepositoryNotReady(repository, sourcev1.VerificationFailedReason, err.Error()), err
}
err := commit.Verify(secret)
if err != nil {
return sourcev1.GitRepositoryNotReady(repository, sourcev1.VerificationFailedReason, err.Error()), err
}
}
// create artifact dir
err = r.Storage.MkdirAll(artifact)
if err != nil {
err = fmt.Errorf("mkdir dir error: %w", err)
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
}
for i, incl := range repository.Spec.Include {
toPath, err := securejoin.SecureJoin(tmpGit, incl.GetToPath())
if err != nil {
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
}
err = r.Storage.CopyToPath(includedArtifacts[i], incl.GetFromPath(), toPath)
if err != nil {
return sourcev1.GitRepositoryNotReady(repository, "DependencyNotReady", err.Error()), err
}
}
// acquire lock
unlock, err := r.Storage.Lock(artifact) unlock, err := r.Storage.Lock(artifact)
if err != nil { if err != nil {
err = fmt.Errorf("unable to acquire lock: %w", err) ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err return ctrl.Result{}, err
} }
defer unlock() defer unlock()
// archive artifact and check integrity // Load ignore rules for archiving
ignoreDomain := strings.Split(tmpGit, string(filepath.Separator)) ps, err := sourceignore.LoadIgnorePatterns(dir, nil)
ps, err := sourceignore.LoadIgnorePatterns(tmpGit, ignoreDomain)
if err != nil { if err != nil {
err = fmt.Errorf(".sourceignore error: %w", err) r.Eventf(ctx, obj, events.EventSeverityError,
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err "SourceIgnoreError", "Failed to load source ignore patterns from repository: %s", err)
return ctrl.Result{}, err
} }
if repository.Spec.Ignore != nil { if obj.Spec.Ignore != nil {
ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*repository.Spec.Ignore), ignoreDomain)...) ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
}
if err := r.Storage.Archive(&artifact, tmpGit, SourceIgnoreFilter(ps, ignoreDomain)); err != nil {
err = fmt.Errorf("storage archive error: %w", err)
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
} }
// update latest symlink // Archive directory to storage
if err := r.Storage.Archive(&artifact, dir, SourceIgnoreFilter(ps, nil)); err != nil {
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
"Unable to archive artifact to storage: %s", err)
return ctrl.Result{}, err
}
r.Events.EventWithMetaf(ctx, obj, map[string]string{
"revision": artifact.Revision,
"checksum": artifact.Checksum,
}, events.EventSeverityInfo, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()
obj.Status.IncludedArtifacts = includes
// Update symlink on a "best effort" basis
url, err := r.Storage.Symlink(artifact, "latest.tar.gz") url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
if err != nil { if err != nil {
err = fmt.Errorf("storage symlink error: %w", err) r.Events.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err "Failed to update status URL symlink: %s", err)
} }
if url != "" {
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) obj.Status.URL = url
return sourcev1.GitRepositoryReady(repository, artifact, includedArtifacts, url, sourcev1.GitOperationSucceedReason, message), nil }
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
} }
func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, repository sourcev1.GitRepository) (ctrl.Result, error) { // reconcileInclude reconciles the declared includes from the object by copying their artifact (sub)contents to the
if err := r.gc(repository); err != nil { // declared paths in the given directory.
r.event(ctx, repository, events.EventSeverityError, //
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) // If an include is unavailable, it marks the object with v1beta1.IncludeUnavailableCondition and returns early.
// If the copy operations are successful, it deletes the v1beta1.IncludeUnavailableCondition from the object.
// If the artifactSet differs from the current set, it marks the object with v1beta1.ArtifactOutdatedCondition.
//
// The caller should assume a failure if an error is returned, or the Result is zero.
func (r *GitRepositoryReconciler) reconcileInclude(ctx context.Context, obj *sourcev1.GitRepository, artifacts artifactSet, dir string) (ctrl.Result, error) {
artifacts = make(artifactSet, len(obj.Spec.Include))
for i, incl := range obj.Spec.Include {
// Do this first as it is much cheaper than copy operations
toPath, err := securejoin.SecureJoin(dir, incl.GetToPath())
if err != nil {
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "IllegalPath",
"Path calculation for include %q failed: %s", incl.GitRepositoryRef.Name, err.Error())
return ctrl.Result{}, err
}
// Retrieve the included GitRepository
dep := &sourcev1.GitRepository{}
if err := r.Get(ctx, types.NamespacedName{Namespace: obj.Namespace, Name: incl.GitRepositoryRef.Name}, dep); err != nil {
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "NotFound",
"Could not get resource for include %q: %s", incl.GitRepositoryRef.Name, err.Error())
return ctrl.Result{}, err
}
// Confirm include has an artifact
if dep.GetArtifact() == nil {
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "NoArtifact",
"No artifact available for include %q", incl.GitRepositoryRef.Name)
return ctrl.Result{}, nil
}
// Copy artifact (sub)contents to configured directory
if err := r.Storage.CopyToPath(dep.GetArtifact(), incl.GetFromPath(), toPath); err != nil {
conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "CopyFailure",
"Failed to copy %q include from %s to %s: %s", incl.GitRepositoryRef.Name, incl.GetFromPath(), incl.GetToPath(), err.Error())
r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.IncludeUnavailableCondition,
"Failed to copy %q include from %s to %s: %s", incl.GitRepositoryRef.Name, incl.GetFromPath(), incl.GetToPath(), err.Error())
return ctrl.Result{}, err
}
artifacts[i] = dep.GetArtifact().DeepCopy()
}
// We now know all includes are available
conditions.Delete(obj, sourcev1.IncludeUnavailableCondition)
// Observe if the artifacts still match the previous included ones
if artifacts.Diff(obj.Status.IncludedArtifacts) {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "IncludeChange", "Included artifacts differ from last observed includes")
}
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
}
// reconcileDelete handles the delete of an object. It first garbage collects all artifacts for the object from the
// artifact storage, if successful, the finalizer is removed from the object.
func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.GitRepository) (ctrl.Result, error) {
// Garbage collect the resource's artifacts
if err := r.garbageCollect(ctx, obj); err != nil {
// Return the error so we retry the failed garbage collection // Return the error so we retry the failed garbage collection
return ctrl.Result{}, err return ctrl.Result{}, err
} }
// Record deleted status // Remove our finalizer from the list
r.recordReadiness(ctx, repository) controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&repository, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
return ctrl.Result{}, err
}
// Stop reconciliation as the object is being deleted // Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
// resetStatus returns a modified v1beta1.GitRepository and a boolean indicating // verifyCommitSignature verifies the signature of the given commit if a verification mode is configured on the object.
// if the status field has been reset. func (r *GitRepositoryReconciler) verifyCommitSignature(ctx context.Context, obj *sourcev1.GitRepository, commit git.Commit) (ctrl.Result, error) {
func (r *GitRepositoryReconciler) resetStatus(repository sourcev1.GitRepository) (sourcev1.GitRepository, bool) { // Check if there is a commit verification is configured and remove any old observations if there is none
// We do not have an artifact, or it does no longer exist if obj.Spec.Verification == nil || obj.Spec.Verification.Mode == "" {
if repository.GetArtifact() == nil || !r.Storage.ArtifactExist(*repository.GetArtifact()) { conditions.Delete(obj, sourcev1.SourceVerifiedCondition)
repository = sourcev1.GitRepositoryProgressing(repository) return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
repository.Status.Artifact = nil
return repository, true
} }
if repository.Generation != repository.Status.ObservedGeneration {
return sourcev1.GitRepositoryProgressing(repository), true // Get secret with GPG data
publicKeySecret := types.NamespacedName{
Namespace: obj.Namespace,
Name: obj.Spec.Verification.SecretRef.Name,
} }
return repository, false var secret corev1.Secret
if err := r.Client.Get(ctx, publicKeySecret, &secret); err != nil {
conditions.MarkFalse(obj, sourcev1.SourceVerifiedCondition, meta.FailedReason, "PGP public keys secret error: %s", err.Error())
r.Eventf(ctx, obj, events.EventSeverityError, "VerificationError", "PGP public keys secret error: %s", err.Error())
return ctrl.Result{}, err
}
// Verify commit with GPG data from secret
if err := commit.Verify(secret); err != nil {
conditions.MarkFalse(obj, sourcev1.SourceVerifiedCondition, meta.FailedReason, "Signature verification of commit %q failed: %s", commit.Hash(), err)
r.Eventf(ctx, obj, events.EventSeverityError, "InvalidCommitSignature", "Signature verification of commit %q failed: %s", commit.Hash(), err)
// Return error in the hope the secret changes
return ctrl.Result{}, err
}
conditions.MarkTrue(obj, sourcev1.SourceVerifiedCondition, meta.SucceededReason, "Verified signature of commit %q", commit.Hash())
r.Eventf(ctx, obj, events.EventSeverityInfo, "VerifiedCommit", "Verified signature of commit %q", commit.Hash())
return ctrl.Result{RequeueAfter: obj.Spec.Interval.Duration}, nil
} }
// gc performs a garbage collection for the given v1beta1.GitRepository. // garbageCollect performs a garbage collection for the given v1beta1.GitRepository. It removes all but the current
// It removes all but the current artifact except for when the // artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
// deletion timestamp is set, which will result in the removal of // resource.
// all artifacts for the resource. func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourcev1.GitRepository) error {
func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) error { if !obj.DeletionTimestamp.IsZero() {
if !repository.DeletionTimestamp.IsZero() { if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), "", "*")) r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed",
"Garbage collection for deleted resource failed: %s", err)
return err
} }
if repository.GetArtifact() != nil { obj.Status.Artifact = nil
return r.Storage.RemoveAllButCurrent(*repository.GetArtifact()) // TODO(hidde): we should only push this event if we actually garbage collected something
r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded",
"Garbage collected artifacts for deleted resource")
return nil
}
if obj.GetArtifact() != nil {
if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
return err
}
// TODO(hidde): we should only push this event if we actually garbage collected something
r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
} }
return nil return nil
} }
// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *GitRepositoryReconciler) event(ctx context.Context, repository sourcev1.GitRepository, severity, msg string) {
log := logr.FromContext(ctx)
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&repository, "Normal", severity, msg)
}
if r.ExternalEventRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
log.Error(err, "unable to send event")
return
}
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
log.Error(err, "unable to send event")
return
}
}
}
func (r *GitRepositoryReconciler) recordReadiness(ctx context.Context, repository sourcev1.GitRepository) {
log := logr.FromContext(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !repository.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !repository.DeletionTimestamp.IsZero())
}
}
func (r *GitRepositoryReconciler) recordSuspension(ctx context.Context, gitrepository sourcev1.GitRepository) {
if r.MetricsRecorder == nil {
return
}
log := logr.FromContext(ctx)
objRef, err := reference.GetReference(r.Scheme, &gitrepository)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}
if !gitrepository.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, gitrepository.Spec.Suspend)
}
}
func (r *GitRepositoryReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.GitRepositoryStatus) error {
var repository sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return err
}
patch := client.MergeFrom(repository.DeepCopy())
repository.Status = newStatus
return r.Status().Patch(ctx, &repository, patch)
}

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/fluxcd/pkg/runtime/controller"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"helm.sh/helm/v3/pkg/getter" "helm.sh/helm/v3/pkg/getter"
@ -112,12 +113,14 @@ var _ = BeforeSuite(func(done Done) {
}) })
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
testEventsH = controller.MakeEvents(k8sManager, "source-controller-test", nil)
err = (&GitRepositoryReconciler{ err = (&GitRepositoryReconciler{
Client: k8sManager.GetClient(), Client: k8sManager.GetClient(),
Scheme: scheme.Scheme, Events: testEventsH,
Storage: ginkgoTestStorage, Storage: ginkgoTestStorage,
}).SetupWithManager(k8sManager) }).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred(), "failed to setup GtRepositoryReconciler") Expect(err).ToNot(HaveOccurred(), "failed to setup GitRepositoryReconciler")
err = (&HelmRepositoryReconciler{ err = (&HelmRepositoryReconciler{
Client: k8sManager.GetClient(), Client: k8sManager.GetClient(),

View File

@ -89,14 +89,14 @@ func TestMain(m *testing.M) {
testEventsH = controller.MakeEvents(testEnv, "source-controller-test", nil) testEventsH = controller.MakeEvents(testEnv, "source-controller-test", nil)
testMetricsH = controller.MustMakeMetrics(testEnv) testMetricsH = controller.MustMakeMetrics(testEnv)
//if err := (&GitRepositoryReconciler{ if err := (&GitRepositoryReconciler{
// Client: testEnv, Client: testEnv,
// Events: testEventsH, Events: testEventsH,
// Metrics: testMetricsH, Metrics: testMetricsH,
// Storage: testStorage, Storage: testStorage,
//}).SetupWithManager(testEnv); err != nil { }).SetupWithManager(testEnv); err != nil {
// panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err)) panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
//} }
go func() { go func() {
fmt.Println("Starting the test environment") fmt.Println("Starting the test environment")

View File

@ -0,0 +1 @@
**.txt

View File

View File

@ -0,0 +1,5 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: dummy

3
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/blang/semver/v4 v4.0.0 github.com/blang/semver/v4 v4.0.0
github.com/cyphar/filepath-securejoin v0.2.2 github.com/cyphar/filepath-securejoin v0.2.2
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1 github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1
github.com/fluxcd/pkg/gittestserver v0.3.0 github.com/fluxcd/pkg/gittestserver v0.3.2
github.com/fluxcd/pkg/gitutil v0.1.0 github.com/fluxcd/pkg/gitutil v0.1.0
github.com/fluxcd/pkg/helmtestserver v0.2.0 github.com/fluxcd/pkg/helmtestserver v0.2.0
github.com/fluxcd/pkg/lockedfile v0.1.0 github.com/fluxcd/pkg/lockedfile v0.1.0
@ -34,6 +34,7 @@ require (
k8s.io/api v0.21.2 k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.3 k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.2 k8s.io/client-go v0.21.2
k8s.io/utils v0.0.0-20210527160623-6fdb442a123b
sigs.k8s.io/controller-runtime v0.9.3 sigs.k8s.io/controller-runtime v0.9.3
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0
) )

4
go.sum
View File

@ -231,8 +231,8 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1 h1:RHHrztAFv9wmjM+Pk7Svt1UdD+1SdnQSp76MWFiM7Hg= github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1 h1:RHHrztAFv9wmjM+Pk7Svt1UdD+1SdnQSp76MWFiM7Hg=
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1/go.mod h1:yUblM2vg+X8TE3A2VvJfdhkGmg+uqBlSPkLk7dxi0UM= github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1/go.mod h1:yUblM2vg+X8TE3A2VvJfdhkGmg+uqBlSPkLk7dxi0UM=
github.com/fluxcd/pkg/gittestserver v0.3.0 h1:6aa30mybecBwBWaJ2IEk7pQzefWnjWjxkTSrHMHawvg= github.com/fluxcd/pkg/gittestserver v0.3.2 h1:oc1OoZ4b+kAu0vu/RT9wUwuQZxSqEjBOlQWYYA+YeLM=
github.com/fluxcd/pkg/gittestserver v0.3.0/go.mod h1:8j36Z6B0BuKNZZ6exAWoyDEpyQoFcjz1IX3WBT7PZNg= github.com/fluxcd/pkg/gittestserver v0.3.2/go.mod h1:8j36Z6B0BuKNZZ6exAWoyDEpyQoFcjz1IX3WBT7PZNg=
github.com/fluxcd/pkg/gitutil v0.1.0 h1:VO3kJY/CKOCO4ysDNqfdpTg04icAKBOSb3lbR5uE/IE= github.com/fluxcd/pkg/gitutil v0.1.0 h1:VO3kJY/CKOCO4ysDNqfdpTg04icAKBOSb3lbR5uE/IE=
github.com/fluxcd/pkg/gitutil v0.1.0/go.mod h1:Ybz50Ck5gkcnvF0TagaMwtlRy3X3wXuiri1HVsK5id4= github.com/fluxcd/pkg/gitutil v0.1.0/go.mod h1:Ybz50Ck5gkcnvF0TagaMwtlRy3X3wXuiri1HVsK5id4=
github.com/fluxcd/pkg/helmtestserver v0.2.0 h1:cE7YHDmrWI0hr9QpaaeQ0vQ16Z0IiqZKiINDpqdY610= github.com/fluxcd/pkg/helmtestserver v0.2.0 h1:cE7YHDmrWI0hr9QpaaeQ0vQ16Z0IiqZKiINDpqdY610=

42
main.go
View File

@ -25,6 +25,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/fluxcd/pkg/runtime/client"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/leaderelection"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/pkg/runtime/pprof"
"github.com/fluxcd/pkg/runtime/probes"
"github.com/go-logr/logr" "github.com/go-logr/logr"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
"helm.sh/helm/v3/pkg/getter" "helm.sh/helm/v3/pkg/getter"
@ -33,15 +40,6 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/leaderelection"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/pprof"
"github.com/fluxcd/pkg/runtime/probes"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
"github.com/fluxcd/source-controller/controllers" "github.com/fluxcd/source-controller/controllers"
@ -99,25 +97,24 @@ func main() {
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true, flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.") "Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.")
clientOptions.BindFlags(flag.CommandLine) clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine)
leaderElectionOptions.BindFlags(flag.CommandLine) leaderElectionOptions.BindFlags(flag.CommandLine)
flag.Parse() flag.Parse()
ctrl.SetLogger(logger.NewLogger(logOptions)) ctrl.SetLogger(logger.NewLogger(logOptions))
var eventRecorder *events.Recorder var eventRecorder *events.Recorder
if eventsAddr != "" { if eventsAddr != "" {
if er, err := events.NewRecorder(eventsAddr, controllerName); err != nil { er, err := events.NewRecorder(eventsAddr, controllerName)
if err != nil {
setupLog.Error(err, "unable to create event recorder") setupLog.Error(err, "unable to create event recorder")
os.Exit(1) os.Exit(1)
} else { }
eventRecorder = er eventRecorder = er
} }
}
metricsRecorder := metrics.NewRecorder()
crtlmetrics.Registry.MustRegister(metricsRecorder.Collectors()...)
watchNamespace := "" watchNamespace := ""
if !watchAllNamespaces { if !watchAllNamespaces {
@ -147,6 +144,9 @@ func main() {
probes.SetupChecks(mgr, setupLog) probes.SetupChecks(mgr, setupLog)
pprof.SetupHandlers(mgr, setupLog) pprof.SetupHandlers(mgr, setupLog)
eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder)
metricsH := helper.MustMakeMetrics(mgr)
if storageAdvAddr == "" { if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog) storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
} }
@ -154,11 +154,9 @@ func main() {
if err = (&controllers.GitRepositoryReconciler{ if err = (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Scheme: mgr.GetScheme(), Events: eventsH,
Metrics: metricsH,
Storage: storage, Storage: storage,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{ }).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent, MaxConcurrentReconciles: concurrent,
DependencyRequeueInterval: requeueDependency, DependencyRequeueInterval: requeueDependency,
@ -173,7 +171,7 @@ func main() {
Getters: getters, Getters: getters,
EventRecorder: mgr.GetEventRecorderFor(controllerName), EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: eventRecorder, ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder, MetricsRecorder: metricsH.MetricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{ }).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent, MaxConcurrentReconciles: concurrent,
}); err != nil { }); err != nil {
@ -187,7 +185,7 @@ func main() {
Getters: getters, Getters: getters,
EventRecorder: mgr.GetEventRecorderFor(controllerName), EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: eventRecorder, ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder, MetricsRecorder: metricsH.MetricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent, MaxConcurrentReconciles: concurrent,
}); err != nil { }); err != nil {
@ -200,7 +198,7 @@ func main() {
Storage: storage, Storage: storage,
EventRecorder: mgr.GetEventRecorderFor(controllerName), EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: eventRecorder, ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder, MetricsRecorder: metricsH.MetricsRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent, MaxConcurrentReconciles: concurrent,
}); err != nil { }); err != nil {