Calculate checksums during file writes
This commit is contained in:
parent
1ab12869ac
commit
42706a342b
|
@ -122,62 +122,45 @@ const (
|
|||
GitOperationFailedReason string = "GitOperationFailed"
|
||||
)
|
||||
|
||||
// GitRepositoryReady sets the given artifact and url on the
|
||||
// GitRepository and resets the conditions to SourceCondition of
|
||||
// type Ready with status true and the given reason and message.
|
||||
// It returns the modified GitRepository.
|
||||
func GitRepositoryReady(repository GitRepository, artifact Artifact, url, reason, message string) GitRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
repository.Status.URL = url
|
||||
|
||||
if repository.Status.Artifact != nil {
|
||||
if repository.Status.Artifact.Path != artifact.Path {
|
||||
repository.Status.Artifact = &artifact
|
||||
}
|
||||
} else {
|
||||
repository.Status.Artifact = &artifact
|
||||
}
|
||||
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryProgressing resets the conditions of the GitRepository
|
||||
// to SourceCondition of type Ready with status unknown and
|
||||
// progressing reason and message. It returns the modified GitRepository.
|
||||
func GitRepositoryProgressing(repository GitRepository) GitRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionUnknown,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: ProgressingReason,
|
||||
Message: "reconciliation in progress",
|
||||
},
|
||||
}
|
||||
repository.Status.URL = ""
|
||||
repository.Status.Artifact = nil
|
||||
repository.Status.Conditions = []SourceCondition{}
|
||||
SetGitRepositoryCondition(&repository, ReadyCondition, corev1.ConditionUnknown, ProgressingReason, "reconciliation in progress")
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryNotReady resets the conditions of the GitRepository
|
||||
// to SourceCondition of type Ready with status false and the given
|
||||
// reason and message. It returns the modified GitRepository.
|
||||
// SetGitRepositoryCondition sets the given condition with the given status, reason and message
|
||||
// on the GitRepository.
|
||||
func SetGitRepositoryCondition(repository *GitRepository, condition string, status corev1.ConditionStatus, reason, message string) {
|
||||
repository.Status.Conditions = filterOutSourceCondition(repository.Status.Conditions, condition)
|
||||
repository.Status.Conditions = append(repository.Status.Conditions, SourceCondition{
|
||||
Type: condition,
|
||||
Status: status,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// GitRepositoryReady sets the given artifact and url on the GitRepository
|
||||
// and sets the ReadyCondition to True, with the given reason and
|
||||
// message. It returns the modified GitRepository.
|
||||
func GitRepositoryReady(repository GitRepository, artifact Artifact, url, reason, message string) GitRepository {
|
||||
repository.Status.Artifact = &artifact
|
||||
repository.Status.URL = url
|
||||
SetGitRepositoryCondition(&repository, ReadyCondition, corev1.ConditionTrue, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
// GitRepositoryNotReady sets the ReadyCondition on the given GitRepository
|
||||
// to False, with the given reason and message. It returns the modified
|
||||
// GitRepository.
|
||||
func GitRepositoryNotReady(repository GitRepository, reason, message string) GitRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionFalse,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
SetGitRepositoryCondition(&repository, ReadyCondition, corev1.ConditionFalse, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ const (
|
|||
ChartPackageSucceededReason string = "ChartPackageSucceeded"
|
||||
)
|
||||
|
||||
// HelmReleaseProgressing resets any failures and registers progress toward reconciling the given HelmRelease
|
||||
// HelmChartProgressing resets any failures and registers progress toward reconciling the given HelmChart
|
||||
// by setting the ReadyCondition to ConditionUnknown for ProgressingReason.
|
||||
func HelmChartProgressing(chart HelmChart) HelmChart {
|
||||
chart.Status.URL = ""
|
||||
|
|
|
@ -76,62 +76,45 @@ const (
|
|||
IndexationSucceededReason string = "IndexationSucceed"
|
||||
)
|
||||
|
||||
// HelmRepositoryReady sets the given artifact and url on the
|
||||
// HelmRepository and resets the conditions to SourceCondition of
|
||||
// type Ready with status true and the given reason and message.
|
||||
// It returns the modified HelmRepository.
|
||||
func HelmRepositoryReady(repository HelmRepository, artifact Artifact, url, reason, message string) HelmRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
repository.Status.URL = url
|
||||
|
||||
if repository.Status.Artifact != nil {
|
||||
if repository.Status.Artifact.Path != artifact.Path {
|
||||
repository.Status.Artifact = &artifact
|
||||
}
|
||||
} else {
|
||||
repository.Status.Artifact = &artifact
|
||||
}
|
||||
|
||||
return repository
|
||||
}
|
||||
|
||||
// HelmRepositoryProgressing resets the conditions of the HelmRepository
|
||||
// to SourceCondition of type Ready with status unknown and
|
||||
// progressing reason and message. It returns the modified HelmRepository.
|
||||
func HelmRepositoryProgressing(repository HelmRepository) HelmRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionUnknown,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: ProgressingReason,
|
||||
Message: "reconciliation in progress",
|
||||
},
|
||||
}
|
||||
repository.Status.URL = ""
|
||||
repository.Status.Artifact = nil
|
||||
repository.Status.Conditions = []SourceCondition{}
|
||||
SetHelmRepositoryCondition(&repository, ReadyCondition, corev1.ConditionUnknown, ProgressingReason, "reconciliation in progress")
|
||||
return repository
|
||||
}
|
||||
|
||||
// HelmRepositoryNotReady resets the conditions of the HelmRepository
|
||||
// to SourceCondition of type Ready with status false and the given
|
||||
// reason and message. It returns the modified HelmRepository.
|
||||
// SetHelmRepositoryCondition sets the given condition with the given status,
|
||||
// reason and message on the HelmRepository.
|
||||
func SetHelmRepositoryCondition(repository *HelmRepository, condition string, status corev1.ConditionStatus, reason, message string) {
|
||||
repository.Status.Conditions = filterOutSourceCondition(repository.Status.Conditions, condition)
|
||||
repository.Status.Conditions = append(repository.Status.Conditions, SourceCondition{
|
||||
Type: condition,
|
||||
Status: status,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// HelmRepositoryReady sets the given artifact and url on the HelmRepository
|
||||
// and sets the ReadyCondition to True, with the given reason and
|
||||
// message. It returns the modified HelmRepository.
|
||||
func HelmRepositoryReady(repository HelmRepository, artifact Artifact, url, reason, message string) HelmRepository {
|
||||
repository.Status.Artifact = &artifact
|
||||
repository.Status.URL = url
|
||||
SetHelmRepositoryCondition(&repository, ReadyCondition, corev1.ConditionTrue, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
// HelmRepositoryNotReady sets the ReadyCondition on the given HelmRepository
|
||||
// to False, with the given reason and message. It returns the modified
|
||||
// HelmRepository.
|
||||
func HelmRepositoryNotReady(repository HelmRepository, reason, message string) HelmRepository {
|
||||
repository.Status.Conditions = []SourceCondition{
|
||||
{
|
||||
Type: ReadyCondition,
|
||||
Status: corev1.ConditionFalse,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
SetHelmRepositoryCondition(&repository, ReadyCondition, corev1.ConditionFalse, reason, message)
|
||||
return repository
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/go-git/go-git/v5/plumbing/transport"
|
||||
"github.com/go-logr/logr"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
|
@ -100,13 +99,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
|
|||
}
|
||||
|
||||
// set initial status
|
||||
if reset, status := r.shouldResetStatus(repository); reset {
|
||||
repository.Status = status
|
||||
if err := r.Status().Update(ctx, &repository); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
} else {
|
||||
if repository.Generation == 0 || repository.GetArtifact() != nil && !r.Storage.ArtifactExist(*repository.GetArtifact()) {
|
||||
repository = sourcev1.GitRepositoryProgressing(repository)
|
||||
if err := r.Status().Update(ctx, &repository); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
|
@ -202,6 +195,12 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
|
|||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.GitOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
// return early on unchanged revision
|
||||
artifact := r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", commit.Hash.String()))
|
||||
if repository.GetArtifact() != nil && repository.GetArtifact().Revision == revision {
|
||||
return repository, nil
|
||||
}
|
||||
|
||||
// verify PGP signature
|
||||
if repository.Spec.Verification != nil {
|
||||
err := r.verify(ctx, types.NamespacedName{
|
||||
|
@ -213,11 +212,6 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(hidde): implement checksum when https://github.com/fluxcd/source-controller/pull/133
|
||||
// has been merged.
|
||||
artifact := r.Storage.ArtifactFor(repository.Kind, repository.ObjectMeta.GetObjectMeta(),
|
||||
fmt.Sprintf("%s.tar.gz", commit.Hash.String()), revision, "")
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
if err != nil {
|
||||
|
@ -234,7 +228,7 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
|
|||
defer unlock()
|
||||
|
||||
// archive artifact and check integrity
|
||||
if err := r.Storage.Archive(artifact, tmpGit, repository.Spec); err != nil {
|
||||
if err := r.Storage.Archive(&artifact, tmpGit, repository.Spec); err != nil {
|
||||
err = fmt.Errorf("storage archive error: %w", err)
|
||||
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
@ -250,32 +244,6 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
|
|||
return sourcev1.GitRepositoryReady(repository, artifact, url, sourcev1.GitOperationSucceedReason, message), nil
|
||||
}
|
||||
|
||||
// shouldResetStatus returns a boolean indicating if the status of the
|
||||
// given repository should be reset.
|
||||
func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) {
|
||||
resetStatus := false
|
||||
if repository.Status.Artifact != nil {
|
||||
if !r.Storage.ArtifactExist(*repository.Status.Artifact) {
|
||||
resetStatus = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(repository.Status.Conditions) == 0 || resetStatus {
|
||||
resetStatus = true
|
||||
}
|
||||
|
||||
return resetStatus, sourcev1.GitRepositoryStatus{
|
||||
Conditions: []sourcev1.SourceCondition{
|
||||
{
|
||||
Type: sourcev1.ReadyCondition,
|
||||
Status: corev1.ConditionUnknown,
|
||||
Reason: sourcev1.InitializingReason,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// verify returns an error if the PGP signature can't be verified
|
||||
func (r *GitRepositoryReconciler) verify(ctx context.Context, publicKeySecret types.NamespacedName, commit *object.Commit) error {
|
||||
if commit.PGPSignature == "" {
|
||||
|
@ -304,10 +272,10 @@ func (r *GitRepositoryReconciler) verify(ctx context.Context, publicKeySecret ty
|
|||
// the given repository.
|
||||
func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository, all bool) error {
|
||||
if all {
|
||||
return r.Storage.RemoveAll(r.Storage.ArtifactFor(repository.Kind, repository.GetObjectMeta(), "", "", ""))
|
||||
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), "", ""))
|
||||
}
|
||||
if repository.Status.Artifact != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*repository.Status.Artifact)
|
||||
if repository.GetArtifact() != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*repository.GetArtifact())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,15 +17,12 @@ limitations under the License.
|
|||
package controllers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -197,6 +194,11 @@ func (r *HelmChartReconciler) reconcileFromHelmRepository(ctx context.Context,
|
|||
return sourcev1.HelmChartNotReady(chart, sourcev1.ChartPullFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
// return early on unchanged chart version
|
||||
if repository.GetArtifact() != nil && repository.GetArtifact().Revision == cv.Version {
|
||||
return chart, nil
|
||||
}
|
||||
|
||||
// TODO(hidde): according to the Helm source the first item is not
|
||||
// always the correct one to pick, check for updates once in awhile.
|
||||
// Ref: https://github.com/helm/helm/blob/v3.3.0/pkg/downloader/chart_downloader.go#L241
|
||||
|
@ -257,10 +259,7 @@ func (r *HelmChartReconciler) reconcileFromHelmRepository(ctx context.Context,
|
|||
return sourcev1.HelmChartNotReady(chart, sourcev1.ChartPullFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
sum := r.Storage.Checksum(io.TeeReader(res, &buf))
|
||||
artifact := r.Storage.ArtifactFor(chart.Kind, chart.GetObjectMeta(),
|
||||
fmt.Sprintf("%s-%s-%s.tgz", cv.Name, cv.Version, sum), cv.Version, sum)
|
||||
artifact := r.Storage.NewArtifactFor(chart.Kind, chart.GetObjectMeta(), cv.Version, fmt.Sprintf("%s-%s.tgz", cv.Name, cv.Version))
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
|
@ -278,8 +277,7 @@ func (r *HelmChartReconciler) reconcileFromHelmRepository(ctx context.Context,
|
|||
defer unlock()
|
||||
|
||||
// save artifact to storage
|
||||
err = r.Storage.AtomicWriteFile(artifact, &buf, 0644)
|
||||
if err != nil {
|
||||
if err := r.Storage.AtomicWriteFile(&artifact, res, 0644); err != nil {
|
||||
err = fmt.Errorf("unable to write chart file: %w", err)
|
||||
return sourcev1.HelmChartNotReady(chart, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
@ -315,7 +313,7 @@ func (r *HelmChartReconciler) getChartRepositoryWithArtifact(ctx context.Context
|
|||
return repository, err
|
||||
}
|
||||
|
||||
if repository.Status.Artifact == nil {
|
||||
if repository.GetArtifact() == nil {
|
||||
err = fmt.Errorf("no repository index artifact found in HelmRepository '%s'", repository.Name)
|
||||
}
|
||||
|
||||
|
@ -360,14 +358,11 @@ func (r *HelmChartReconciler) reconcileFromGitRepository(ctx context.Context,
|
|||
}
|
||||
|
||||
// return early on unchanged chart version
|
||||
if chart.Status.Artifact != nil && chartMetadata.Version == chart.Status.Artifact.Revision {
|
||||
if chart.GetArtifact() != nil && chart.GetArtifact().Revision == chartMetadata.Version {
|
||||
return chart, nil
|
||||
}
|
||||
|
||||
// TODO(hidde): implement checksum when https://github.com/fluxcd/source-controller/pull/133
|
||||
// has been merged.
|
||||
artifact := r.Storage.ArtifactFor(chart.Kind, chart.ObjectMeta.GetObjectMeta(),
|
||||
fmt.Sprintf("%s-%s.tgz", chartMetadata.Name, chartMetadata.Version), chartMetadata.Version, "")
|
||||
artifact := r.Storage.NewArtifactFor(chart.Kind, chart.ObjectMeta.GetObjectMeta(), chartMetadata.Version, fmt.Sprintf("%s-%s.tgz", chartMetadata.Name, chartMetadata.Version))
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
|
@ -386,22 +381,35 @@ func (r *HelmChartReconciler) reconcileFromGitRepository(ctx context.Context,
|
|||
|
||||
// package chart
|
||||
pkg := action.NewPackage()
|
||||
pkg.Destination = filepath.Dir(r.Storage.LocalPath(artifact))
|
||||
_, err = pkg.Run(chartPath, nil)
|
||||
pkg.Destination = tmpDir
|
||||
src, err := pkg.Run(chartPath, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("chart package error: %w", err)
|
||||
return sourcev1.HelmChartNotReady(chart, sourcev1.ChartPackageFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
// copy chart package
|
||||
cf, err := os.Open(src)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to open chart package: %w", err)
|
||||
return sourcev1.HelmChartNotReady(chart, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
if err := r.Storage.Copy(&artifact, cf); err != nil {
|
||||
cf.Close()
|
||||
err = fmt.Errorf("failed to copy chart package to storage: %w", err)
|
||||
return sourcev1.HelmChartNotReady(chart, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
cf.Close()
|
||||
|
||||
// update symlink
|
||||
chartUrl, err := r.Storage.Symlink(artifact, fmt.Sprintf("%s-latest.tgz", chartMetadata.Name))
|
||||
cUrl, err := r.Storage.Symlink(artifact, fmt.Sprintf("%s-latest.tgz", chartMetadata.Name))
|
||||
if err != nil {
|
||||
err = fmt.Errorf("storage error: %w", err)
|
||||
return sourcev1.HelmChartNotReady(chart, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
message := fmt.Sprintf("Fetched and packaged revision: %s", artifact.Revision)
|
||||
return sourcev1.HelmChartReady(chart, artifact, chartUrl, sourcev1.ChartPackageSucceededReason, message), nil
|
||||
return sourcev1.HelmChartReady(chart, artifact, cUrl, sourcev1.ChartPackageSucceededReason, message), nil
|
||||
}
|
||||
|
||||
// getGitRepositoryWithArtifact attempts to get the GitRepository for the given
|
||||
|
@ -424,7 +432,7 @@ func (r *HelmChartReconciler) getGitRepositoryWithArtifact(ctx context.Context,
|
|||
return repository, err
|
||||
}
|
||||
|
||||
if repository.Status.Artifact == nil {
|
||||
if repository.GetArtifact() == nil {
|
||||
err = fmt.Errorf("no artifact found for GitRepository '%s'", repository.Name)
|
||||
}
|
||||
|
||||
|
@ -435,10 +443,10 @@ func (r *HelmChartReconciler) getGitRepositoryWithArtifact(ctx context.Context,
|
|||
// the given chart.
|
||||
func (r *HelmChartReconciler) gc(chart sourcev1.HelmChart, all bool) error {
|
||||
if all {
|
||||
return r.Storage.RemoveAll(r.Storage.ArtifactFor(chart.Kind, chart.GetObjectMeta(), "", "", ""))
|
||||
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(chart.Kind, chart.GetObjectMeta(), "", ""))
|
||||
}
|
||||
if chart.Status.Artifact != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*chart.Status.Artifact)
|
||||
if chart.GetArtifact() != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*chart.GetArtifact())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
"helm.sh/helm/v3/pkg/getter"
|
||||
"helm.sh/helm/v3/pkg/repo"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
|
@ -42,6 +41,7 @@ import (
|
|||
|
||||
"github.com/fluxcd/pkg/recorder"
|
||||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
|
||||
"github.com/fluxcd/source-controller/internal/helm"
|
||||
)
|
||||
|
@ -104,13 +104,7 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
|
|||
}
|
||||
|
||||
// set initial status
|
||||
if reset, status := r.shouldResetStatus(repository); reset {
|
||||
repository.Status = status
|
||||
if err := r.Status().Update(ctx, &repository); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
}
|
||||
} else {
|
||||
if repository.Generation == 0 || repository.GetArtifact() != nil && !r.Storage.ArtifactExist(*repository.GetArtifact()) {
|
||||
repository = sourcev1.HelmRepositoryProgressing(repository)
|
||||
if err := r.Status().Update(ctx, &repository); err != nil {
|
||||
log.Error(err, "unable to update status")
|
||||
|
@ -207,7 +201,6 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
|
|||
}
|
||||
|
||||
clientOpts = append(clientOpts, getter.WithTimeout(repository.GetTimeout()))
|
||||
|
||||
res, err := c.Get(u.String(), clientOpts...)
|
||||
if err != nil {
|
||||
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
|
||||
|
@ -217,21 +210,24 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
|
|||
if err != nil {
|
||||
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
i := repo.IndexFile{}
|
||||
if err := yaml.Unmarshal(b, &i); err != nil {
|
||||
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
|
||||
}
|
||||
i.SortEntries()
|
||||
|
||||
// return early on unchanged generation
|
||||
if repository.GetArtifact() != nil && repository.GetArtifact().Revision == i.Generated.Format(time.RFC3339Nano) {
|
||||
return repository, nil
|
||||
}
|
||||
|
||||
i.SortEntries()
|
||||
b, err = yaml.Marshal(&i)
|
||||
if err != nil {
|
||||
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.IndexationFailedReason, err.Error()), err
|
||||
}
|
||||
|
||||
sum := r.Storage.Checksum(bytes.NewReader(b))
|
||||
artifact := r.Storage.ArtifactFor(repository.Kind, repository.ObjectMeta.GetObjectMeta(),
|
||||
fmt.Sprintf("index-%s.yaml", sum), i.Generated.Format(time.RFC3339Nano), sum)
|
||||
artifact := r.Storage.NewArtifactFor(repository.Kind, repository.ObjectMeta.GetObjectMeta(), i.Generated.Format(time.RFC3339Nano),
|
||||
fmt.Sprintf("index-%s.yaml", url.PathEscape(i.Generated.Format(time.RFC3339Nano))))
|
||||
|
||||
// create artifact dir
|
||||
err = r.Storage.MkdirAll(artifact)
|
||||
|
@ -249,8 +245,7 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
|
|||
defer unlock()
|
||||
|
||||
// save artifact to storage
|
||||
err = r.Storage.AtomicWriteFile(artifact, bytes.NewReader(b), 0644)
|
||||
if err != nil {
|
||||
if err := r.Storage.AtomicWriteFile(&artifact, bytes.NewReader(b), 0644); err != nil {
|
||||
err = fmt.Errorf("unable to write repository index file: %w", err)
|
||||
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
|
||||
}
|
||||
|
@ -266,41 +261,14 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sou
|
|||
return sourcev1.HelmRepositoryReady(repository, artifact, indexURL, sourcev1.IndexationSucceededReason, message), nil
|
||||
}
|
||||
|
||||
// shouldResetStatus returns a boolean indicating if the status of the
|
||||
// given repository should be reset.
|
||||
func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) (bool, sourcev1.HelmRepositoryStatus) {
|
||||
resetStatus := false
|
||||
if repository.Status.Artifact != nil {
|
||||
if !r.Storage.ArtifactExist(*repository.Status.Artifact) {
|
||||
resetStatus = true
|
||||
}
|
||||
}
|
||||
|
||||
// set initial status
|
||||
if len(repository.Status.Conditions) == 0 {
|
||||
resetStatus = true
|
||||
}
|
||||
|
||||
return resetStatus, sourcev1.HelmRepositoryStatus{
|
||||
Conditions: []sourcev1.SourceCondition{
|
||||
{
|
||||
Type: sourcev1.ReadyCondition,
|
||||
Status: corev1.ConditionUnknown,
|
||||
Reason: sourcev1.InitializingReason,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// gc performs a garbage collection on all but current artifacts of
|
||||
// the given repository.
|
||||
func (r *HelmRepositoryReconciler) gc(repository sourcev1.HelmRepository, all bool) error {
|
||||
if all {
|
||||
return r.Storage.RemoveAll(r.Storage.ArtifactFor(repository.Kind, repository.GetObjectMeta(), "", "", ""))
|
||||
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), "", ""))
|
||||
}
|
||||
if repository.Status.Artifact != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*repository.Status.Artifact)
|
||||
if repository.GetArtifact() != nil {
|
||||
return r.Storage.RemoveAllButCurrent(*repository.GetArtifact())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"compress/gzip"
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -62,7 +63,6 @@ func NewStorage(basePath string, hostname string, timeout time.Duration) (*Stora
|
|||
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
|
||||
return nil, fmt.Errorf("invalid dir path: %s", basePath)
|
||||
}
|
||||
|
||||
return &Storage{
|
||||
BasePath: basePath,
|
||||
Hostname: hostname,
|
||||
|
@ -70,18 +70,23 @@ func NewStorage(basePath string, hostname string, timeout time.Duration) (*Stora
|
|||
}, nil
|
||||
}
|
||||
|
||||
// ArtifactFor returns an artifact for the v1alpha1.Source.
|
||||
func (s *Storage) ArtifactFor(kind string, metadata metav1.Object, fileName, revision, checksum string) sourcev1.Artifact {
|
||||
// NewArtifactFor returns a new v1alpha1.Artifact.
|
||||
func (s *Storage) NewArtifactFor(kind string, metadata metav1.Object, revision, fileName string) sourcev1.Artifact {
|
||||
path := sourcev1.ArtifactPath(kind, metadata.GetNamespace(), metadata.GetName(), fileName)
|
||||
url := fmt.Sprintf("http://%s/%s", s.Hostname, path)
|
||||
|
||||
return sourcev1.Artifact{
|
||||
Path: path,
|
||||
URL: url,
|
||||
Revision: revision,
|
||||
Checksum: checksum,
|
||||
LastUpdateTime: metav1.Now(),
|
||||
artifact := sourcev1.Artifact{
|
||||
Path: path,
|
||||
Revision: revision,
|
||||
}
|
||||
s.SetArtifactURL(&artifact)
|
||||
return artifact
|
||||
}
|
||||
|
||||
// SetArtifactURL sets the URL on the given v1alpha1.Artifact.
|
||||
func (s Storage) SetArtifactURL(artifact *sourcev1.Artifact) {
|
||||
if artifact.Path == "" {
|
||||
return
|
||||
}
|
||||
artifact.URL = fmt.Sprintf("http://%s/%s", s.Hostname, artifact.Path)
|
||||
}
|
||||
|
||||
// MkdirAll calls os.MkdirAll for the given v1alpha1.Artifact base dir.
|
||||
|
@ -132,12 +137,12 @@ func (s *Storage) ArtifactExist(artifact sourcev1.Artifact) bool {
|
|||
return fi.Mode().IsRegular()
|
||||
}
|
||||
|
||||
// Archive atomically creates a tar.gz to the v1alpha1.Artifact path from the given dir,
|
||||
// excluding any VCS specific files and directories, or any of the excludes defined in
|
||||
// the excludeFiles.
|
||||
func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.GitRepositorySpec) (err error) {
|
||||
if _, err := os.Stat(dir); err != nil {
|
||||
return err
|
||||
// Archive atomically archives the given directory as a tarball to the given v1alpha1.Artifact
|
||||
// path, excluding any VCS specific files and directories, or any of the excludes defined in
|
||||
// the excludeFiles. If successful, it sets the checksum and last update time on the artifact.
|
||||
func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, spec sourcev1.GitRepositorySpec) (err error) {
|
||||
if f, err := os.Stat(dir); os.IsNotExist(err) || !f.IsDir() {
|
||||
return fmt.Errorf("invalid dir path: %s", dir)
|
||||
}
|
||||
|
||||
ps, err := loadExcludePatterns(dir, spec)
|
||||
|
@ -146,37 +151,40 @@ func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.
|
|||
}
|
||||
matcher := gitignore.NewMatcher(ps)
|
||||
|
||||
localPath := s.LocalPath(artifact)
|
||||
tmpGzFile, err := ioutil.TempFile(filepath.Split(localPath))
|
||||
localPath := s.LocalPath(*artifact)
|
||||
tf, err := ioutil.TempFile(filepath.Split(localPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpName := tmpGzFile.Name()
|
||||
tmpName := tf.Name()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
os.Remove(tmpName)
|
||||
}
|
||||
}()
|
||||
|
||||
gw := gzip.NewWriter(tmpGzFile)
|
||||
h := newHash()
|
||||
mw := io.MultiWriter(h, tf)
|
||||
|
||||
gw := gzip.NewWriter(mw)
|
||||
tw := tar.NewWriter(gw)
|
||||
if err := writeToArchiveExcludeMatches(dir, matcher, tw); err != nil {
|
||||
tw.Close()
|
||||
gw.Close()
|
||||
tmpGzFile.Close()
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tw.Close(); err != nil {
|
||||
gw.Close()
|
||||
tmpGzFile.Close()
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
if err := gw.Close(); err != nil {
|
||||
tmpGzFile.Close()
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpGzFile.Close(); err != nil {
|
||||
if err := tf.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -184,7 +192,13 @@ func (s *Storage) Archive(artifact sourcev1.Artifact, dir string, spec sourcev1.
|
|||
return err
|
||||
}
|
||||
|
||||
return fs.RenameWithFallback(tmpName, localPath)
|
||||
if err := fs.RenameWithFallback(tmpName, localPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
artifact.Checksum = fmt.Sprintf("%x", h.Sum(nil))
|
||||
artifact.LastUpdateTime = metav1.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeToArchiveExcludeMatches walks over the given dir and writes any regular file that does
|
||||
|
@ -239,33 +253,81 @@ func writeToArchiveExcludeMatches(dir string, matcher gitignore.Matcher, writer
|
|||
return filepath.Walk(dir, fn)
|
||||
}
|
||||
|
||||
// AtomicWriteFile atomically writes a file to the v1alpha1.Artifact Path.
|
||||
func (s *Storage) AtomicWriteFile(artifact sourcev1.Artifact, reader io.Reader, mode os.FileMode) (err error) {
|
||||
localPath := s.LocalPath(artifact)
|
||||
tmpFile, err := ioutil.TempFile(filepath.Split(localPath))
|
||||
// AtomicWriteFile atomically writes the io.Reader contents to the v1alpha1.Artifact path.
|
||||
// If successful, it sets the checksum and last update time on the artifact.
|
||||
func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader, mode os.FileMode) (err error) {
|
||||
localPath := s.LocalPath(*artifact)
|
||||
tf, err := ioutil.TempFile(filepath.Split(localPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpName := tmpFile.Name()
|
||||
tfName := tf.Name()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
os.Remove(tmpName)
|
||||
os.Remove(tfName)
|
||||
}
|
||||
}()
|
||||
if _, err := io.Copy(tmpFile, reader); err != nil {
|
||||
tmpFile.Close()
|
||||
|
||||
h := newHash()
|
||||
mw := io.MultiWriter(h, tf)
|
||||
|
||||
if _, err := io.Copy(mw, reader); err != nil {
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
if err := tmpFile.Close(); err != nil {
|
||||
if err := tf.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Chmod(tmpName, mode); err != nil {
|
||||
|
||||
if err := os.Chmod(tfName, mode); err != nil {
|
||||
return err
|
||||
}
|
||||
return fs.RenameWithFallback(tmpName, localPath)
|
||||
|
||||
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
artifact.Checksum = fmt.Sprintf("%x", h.Sum(nil))
|
||||
artifact.LastUpdateTime = metav1.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Symlink creates or updates a symbolic link for the given artifact
|
||||
// Copy atomically copies the io.Reader contents to the v1alpha1.Artifact path.
|
||||
// If successful, it sets the checksum and last update time on the artifact.
|
||||
func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error) {
|
||||
localPath := s.LocalPath(*artifact)
|
||||
tf, err := ioutil.TempFile(filepath.Split(localPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tfName := tf.Name()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
os.Remove(tfName)
|
||||
}
|
||||
}()
|
||||
|
||||
h := newHash()
|
||||
mw := io.MultiWriter(h, tf)
|
||||
|
||||
if _, err := io.Copy(mw, reader); err != nil {
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
if err := tf.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := fs.RenameWithFallback(tfName, localPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
artifact.Checksum = fmt.Sprintf("%x", h.Sum(nil))
|
||||
artifact.LastUpdateTime = metav1.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Symlink creates or updates a symbolic link for the given v1alpha1.Artifact
|
||||
// and returns the URL for the symlink.
|
||||
func (s *Storage) Symlink(artifact sourcev1.Artifact, linkName string) (string, error) {
|
||||
localPath := s.LocalPath(artifact)
|
||||
|
@ -285,14 +347,13 @@ func (s *Storage) Symlink(artifact sourcev1.Artifact, linkName string) (string,
|
|||
return "", err
|
||||
}
|
||||
|
||||
parts := strings.Split(artifact.URL, "/")
|
||||
url := strings.Replace(artifact.URL, parts[len(parts)-1], linkName, 1)
|
||||
url := fmt.Sprintf("http://%s/%s", s.Hostname, filepath.Join(filepath.Dir(artifact.Path), linkName))
|
||||
return url, nil
|
||||
}
|
||||
|
||||
// Checksum returns the SHA1 checksum for the data of the given io.Reader as a string.
|
||||
func (s *Storage) Checksum(reader io.Reader) string {
|
||||
h := sha1.New()
|
||||
h := newHash()
|
||||
_, _ = io.Copy(h, reader)
|
||||
return fmt.Sprintf("%x", h.Sum(nil))
|
||||
}
|
||||
|
@ -356,3 +417,8 @@ func loadExcludePatterns(dir string, spec sourcev1.GitRepositorySpec) ([]gitigno
|
|||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// newHash returns a new SHA1 hash.
|
||||
func newHash() hash.Hash {
|
||||
return sha1.New()
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ func createArchive(t *testing.T, storage *Storage, filenames []string, sourceIgn
|
|||
t.Fatalf("artifact directory creation failed: %v", err)
|
||||
}
|
||||
|
||||
if err := storage.Archive(artifact, gitDir, spec); err != nil {
|
||||
if err := storage.Archive(&artifact, gitDir, spec); err != nil {
|
||||
t.Fatalf("archiving failed: %v", err)
|
||||
}
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,7 +10,7 @@ require (
|
|||
github.com/fluxcd/pkg/helmtestserver v0.0.1
|
||||
github.com/fluxcd/pkg/lockedfile v0.0.5
|
||||
github.com/fluxcd/pkg/recorder v0.0.6
|
||||
github.com/fluxcd/pkg/runtime v0.0.0-20200909163337-e7e634246495
|
||||
github.com/fluxcd/pkg/runtime v0.0.1
|
||||
github.com/fluxcd/pkg/ssh v0.0.5
|
||||
github.com/fluxcd/pkg/untar v0.0.5
|
||||
github.com/fluxcd/source-controller/api v0.0.14
|
||||
|
|
4
go.sum
4
go.sum
|
@ -210,8 +210,8 @@ github.com/fluxcd/pkg/lockedfile v0.0.5 h1:C3T8wfdff1UY1bvplmCkGOLrdMWJHO8Q8+tdl
|
|||
github.com/fluxcd/pkg/lockedfile v0.0.5/go.mod h1:uAtPUBId6a2RqO84MTH5HKGX0SbM1kNW3Wr/FhYyDVA=
|
||||
github.com/fluxcd/pkg/recorder v0.0.6 h1:me/n8syeeGXz50OXoPX3jgIj9AtinvhHdKT9Dy+MbHs=
|
||||
github.com/fluxcd/pkg/recorder v0.0.6/go.mod h1:IfQxfVRSNsWs3B0Yp5B6ObEWwKHILlAx8N7XkoDdhFg=
|
||||
github.com/fluxcd/pkg/runtime v0.0.0-20200909163337-e7e634246495 h1:zhtLz8iRtJWK+jKq9vi9Si4QbcAC2KvQZpQ55DRzLsU=
|
||||
github.com/fluxcd/pkg/runtime v0.0.0-20200909163337-e7e634246495/go.mod h1:cU1t0+Ld39pZjMyrrHukw1E++OZFNHxG2qAExfDWQ34=
|
||||
github.com/fluxcd/pkg/runtime v0.0.1 h1:h8jztHVF9UMGD7XBQSfXDdw80bpT6BOkd0xe4kknPL0=
|
||||
github.com/fluxcd/pkg/runtime v0.0.1/go.mod h1:cU1t0+Ld39pZjMyrrHukw1E++OZFNHxG2qAExfDWQ34=
|
||||
github.com/fluxcd/pkg/ssh v0.0.5 h1:rnbFZ7voy2JBlUfMbfyqArX2FYaLNpDhccGFC3qW83A=
|
||||
github.com/fluxcd/pkg/ssh v0.0.5/go.mod h1:7jXPdXZpc0ttMNz2kD9QuMi3RNn/e0DOFbj0Tij/+Hs=
|
||||
github.com/fluxcd/pkg/testserver v0.0.2 h1:SoaMtO9cE5p/wl2zkGudzflnEHd9mk68CGjZOo7w0Uk=
|
||||
|
|
Loading…
Reference in New Issue