Merge pull request #5 from fluxcd/api-v1alpha1

Introduce storage abstraction and locking mechanism
This commit is contained in:
Stefan Prodan 2020-04-11 09:10:24 +03:00 committed by GitHub
commit 686fdd49dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 943 additions and 320 deletions

View File

@ -13,6 +13,7 @@ RUN go mod download
COPY main.go main.go COPY main.go main.go
COPY api/ api/ COPY api/ api/
COPY controllers/ controllers/ COPY controllers/ controllers/
COPY internal/ internal/
# build # build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o source-controller main.go RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o source-controller main.go

View File

@ -5,22 +5,24 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
// RepositoryCondition contains condition information for a repository // SourceCondition contains condition information for a source
type RepositoryCondition struct { type SourceCondition struct {
// Type of the condition, currently ('Ready'). // Type of the condition, currently ('Ready').
Type RepositoryConditionType `json:"type"` // +required
Type string `json:"type"`
// Status of the condition, one of ('True', 'False', 'Unknown'). // Status of the condition, one of ('True', 'False', 'Unknown').
// +required
Status corev1.ConditionStatus `json:"status"` Status corev1.ConditionStatus `json:"status"`
// LastTransitionTime is the timestamp corresponding to the last status // LastTransitionTime is the timestamp corresponding to the last status
// change of this condition. // change of this condition.
// +optional // +required
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"` LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// Reason is a brief machine readable explanation for the condition's last // Reason is a brief machine readable explanation for the condition's last
// transition. // transition.
// +optional // +required
Reason string `json:"reason,omitempty"` Reason string `json:"reason,omitempty"`
// Message is a human readable description of the details of the last // Message is a human readable description of the details of the last
@ -29,11 +31,16 @@ type RepositoryCondition struct {
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
} }
// RepositoryConditionType represents an repository condition value
type RepositoryConditionType string
const ( const (
// RepositoryConditionReady represents the fact that a given repository condition // ReadyCondition represents the fact that a given source is in ready state.
// is in ready state. ReadyCondition string = "Ready"
RepositoryConditionReady RepositoryConditionType = "Ready"
// InitializingReason represents the fact that a given source is being initialize.
InitializingReason string = "Initializing"
// StorageOperationFailedReason signals a failure caused by a storage operation.
StorageOperationFailedReason string = "StorageOperationFailed"
// URLInvalidReason represents the fact that a given source has an invalid URL.
URLInvalidReason string = "URLInvalid"
) )

View File

@ -22,12 +22,13 @@ import (
// GitRepositorySpec defines the desired state of GitRepository // GitRepositorySpec defines the desired state of GitRepository
type GitRepositorySpec struct { type GitRepositorySpec struct {
// +kubebuilder:validation:Pattern="^(http|https|ssh)://"
// The repository URL, can be a HTTP or SSH address. // The repository URL, can be a HTTP or SSH address.
Url string `json:"url"` // +kubebuilder:validation:Pattern="^(http|https|ssh)://"
// +required
URL string `json:"url"`
// The interval at which to check for repository updates. // The interval at which to check for repository updates.
// +required
Interval metav1.Duration `json:"interval"` Interval metav1.Duration `json:"interval"`
// The git branch to checkout, defaults to ('master'). // The git branch to checkout, defaults to ('master').
@ -46,16 +47,16 @@ type GitRepositorySpec struct {
// GitRepositoryStatus defines the observed state of GitRepository // GitRepositoryStatus defines the observed state of GitRepository
type GitRepositoryStatus struct { type GitRepositoryStatus struct {
// +optional // +optional
Conditions []RepositoryCondition `json:"conditions,omitempty"` Conditions []SourceCondition `json:"conditions,omitempty"`
// LastUpdateTime is the timestamp corresponding to the last status // LastUpdateTime is the timestamp corresponding to the last status
// change of this repository. // change of this repository.
// +optional // +optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
// Path to the artifacts of the last repository sync. // Path to the artifact output of the last repository sync.
// +optional // +optional
Artifacts string `json:"artifacts,omitempty"` Artifact string `json:"artifacts,omitempty"`
} }
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
@ -74,9 +75,8 @@ type GitRepository struct {
Status GitRepositoryStatus `json:"status,omitempty"` Status GitRepositoryStatus `json:"status,omitempty"`
} }
// +kubebuilder:object:root=true
// GitRepositoryList contains a list of GitRepository // GitRepositoryList contains a list of GitRepository
// +kubebuilder:object:root=true
type GitRepositoryList struct { type GitRepositoryList struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"` metav1.ListMeta `json:"metadata,omitempty"`
@ -86,3 +86,8 @@ type GitRepositoryList struct {
func init() { func init() {
SchemeBuilder.Register(&GitRepository{}, &GitRepositoryList{}) SchemeBuilder.Register(&GitRepository{}, &GitRepositoryList{})
} }
const (
GitOperationSucceedReason string = "GitOperationSucceed"
GitOperationFailedReason string = "GitOperationFailed"
)

View File

@ -24,16 +24,18 @@ import (
type HelmRepositorySpec struct { type HelmRepositorySpec struct {
// The repository address // The repository address
// +kubebuilder:validation:MinLength=4 // +kubebuilder:validation:MinLength=4
// +required
URL string `json:"url"` URL string `json:"url"`
// The interval at which to check for repository updates // The interval at which to check for repository updates
// +required
Interval metav1.Duration `json:"interval"` Interval metav1.Duration `json:"interval"`
} }
// HelmRepositoryStatus defines the observed state of HelmRepository // HelmRepositoryStatus defines the observed state of HelmRepository
type HelmRepositoryStatus struct { type HelmRepositoryStatus struct {
// +optional // +optional
Conditions []RepositoryCondition `json:"conditions,omitempty"` Conditions []SourceCondition `json:"conditions,omitempty"`
// LastUpdateTime is the timestamp corresponding to the last status // LastUpdateTime is the timestamp corresponding to the last status
// change of this repository. // change of this repository.
@ -61,9 +63,8 @@ type HelmRepository struct {
Status HelmRepositoryStatus `json:"status,omitempty"` Status HelmRepositoryStatus `json:"status,omitempty"`
} }
// +kubebuilder:object:root=true
// HelmRepositoryList contains a list of HelmRepository // HelmRepositoryList contains a list of HelmRepository
// +kubebuilder:object:root=true
type HelmRepositoryList struct { type HelmRepositoryList struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"` metav1.ListMeta `json:"metadata,omitempty"`
@ -75,7 +76,11 @@ func init() {
} }
const ( const (
InvalidHelmRepositoryURLReason string = "InvalidHelmRepositoryURL" // IndexationFailedReason represents the fact that the indexation
IndexFetchFailedReason string = "IndexFetchFailedReason" // of the given Helm repository failed.
IndexFetchSucceededReason string = "IndexFetchSucceed" IndexationFailedReason string = "IndexationFailed"
// IndexationSucceededReason represents the fact that the indexation
// of the given Helm repository succeeded.
IndexationSucceededReason string = "IndexationSucceed"
) )

View File

@ -104,7 +104,7 @@ func (in *GitRepositoryStatus) DeepCopyInto(out *GitRepositoryStatus) {
*out = *in *out = *in
if in.Conditions != nil { if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions in, out := &in.Conditions, &out.Conditions
*out = make([]RepositoryCondition, len(*in)) *out = make([]SourceCondition, len(*in))
for i := range *in { for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
@ -205,7 +205,7 @@ func (in *HelmRepositoryStatus) DeepCopyInto(out *HelmRepositoryStatus) {
*out = *in *out = *in
if in.Conditions != nil { if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions in, out := &in.Conditions, &out.Conditions
*out = make([]RepositoryCondition, len(*in)) *out = make([]SourceCondition, len(*in))
for i := range *in { for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
@ -227,20 +227,17 @@ func (in *HelmRepositoryStatus) DeepCopy() *HelmRepositoryStatus {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RepositoryCondition) DeepCopyInto(out *RepositoryCondition) { func (in *SourceCondition) DeepCopyInto(out *SourceCondition) {
*out = *in *out = *in
if in.LastTransitionTime != nil { in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
in, out := &in.LastTransitionTime, &out.LastTransitionTime
*out = (*in).DeepCopy()
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RepositoryCondition. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SourceCondition.
func (in *RepositoryCondition) DeepCopy() *RepositoryCondition { func (in *SourceCondition) DeepCopy() *SourceCondition {
if in == nil { if in == nil {
return nil return nil
} }
out := new(RepositoryCondition) out := new(SourceCondition)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }

View File

@ -73,12 +73,12 @@ spec:
description: GitRepositoryStatus defines the observed state of GitRepository description: GitRepositoryStatus defines the observed state of GitRepository
properties: properties:
artifacts: artifacts:
description: Path to the artifacts of the last repository sync. description: Path to the artifact output of the last repository sync.
type: string type: string
conditions: conditions:
items: items:
description: RepositoryCondition contains condition information for description: SourceCondition contains condition information for a
a repository source
properties: properties:
lastTransitionTime: lastTransitionTime:
description: LastTransitionTime is the timestamp corresponding description: LastTransitionTime is the timestamp corresponding

View File

@ -64,13 +64,12 @@ spec:
description: HelmRepositoryStatus defines the observed state of HelmRepository description: HelmRepositoryStatus defines the observed state of HelmRepository
properties: properties:
artifact: artifact:
description: Path to the artifact (index file) of the last repository description: Path to the artifact of the last repository index.
sync.
type: string type: string
conditions: conditions:
items: items:
description: RepositoryCondition contains condition information for description: SourceCondition contains condition information for a
a repository source
properties: properties:
lastTransitionTime: lastTransitionTime:
description: LastTransitionTime is the timestamp corresponding description: LastTransitionTime is the timestamp corresponding

44
controllers/conditions.go Normal file
View File

@ -0,0 +1,44 @@
/*
Copyright 2020 The Flux CD contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
)
func ReadyCondition(reason, message string) sourcev1.SourceCondition {
return sourcev1.SourceCondition{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}
func NotReadyCondition(reason, message string) sourcev1.SourceCondition {
return sourcev1.SourceCondition{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}

View File

@ -21,8 +21,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec"
"path/filepath"
"strings" "strings"
"time" "time"
@ -44,9 +42,10 @@ import (
// GitRepositoryReconciler reconciles a GitRepository object // GitRepositoryReconciler reconciles a GitRepository object
type GitRepositoryReconciler struct { type GitRepositoryReconciler struct {
client.Client client.Client
Log logr.Logger Log logr.Logger
Scheme *runtime.Scheme Scheme *runtime.Scheme
StoragePath string Storage *Storage
Kind string
} }
// +kubebuilder:rbac:groups=source.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -56,7 +55,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
log := r.Log.WithValues("gitrepository", req.NamespacedName) log := r.Log.WithValues(r.Kind, req.NamespacedName)
var repo sourcev1.GitRepository var repo sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repo); err != nil { if err := r.Get(ctx, req.NamespacedName, &repo); err != nil {
@ -66,40 +65,35 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
result := ctrl.Result{RequeueAfter: repo.Spec.Interval.Duration} result := ctrl.Result{RequeueAfter: repo.Spec.Interval.Duration}
// set initial status // set initial status
if r.shouldResetStatus(repo) { if reset, status := r.shouldResetStatus(repo); reset {
log.Info("Initialising repository") log.Info("Initializing repository")
repo.Status.Artifacts = "" repo.Status = status
repo.Status.LastUpdateTime = nil
repo.Status.Conditions = []sourcev1.RepositoryCondition{
{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionUnknown,
},
}
if err := r.Status().Update(ctx, &repo); err != nil { if err := r.Status().Update(ctx, &repo); err != nil {
log.Error(err, "unable to update GitRepository status") log.Error(err, "unable to update GitRepository status")
return result, err return result, err
} }
} }
// try to remove old artifacts
r.gc(repo)
// try git clone // try git clone
readyCondition, artifacts, err := r.sync(repo) readyCondition, artifacts, err := r.sync(repo)
if err != nil { if err != nil {
log.Info("Repository sync failed", "error", err.Error()) log.Info("Repository sync failed", "error", err.Error())
} else { } else {
// update artifacts if commit hash changed // update artifacts if commit hash changed
if repo.Status.Artifacts != artifacts { if repo.Status.Artifact != artifacts {
timeNew := metav1.Now() timeNew := metav1.Now()
repo.Status.LastUpdateTime = &timeNew repo.Status.LastUpdateTime = &timeNew
repo.Status.Artifacts = artifacts repo.Status.Artifact = artifacts
} }
log.Info("Repository sync succeeded", "msg", readyCondition.Message) log.Info("Repository sync succeeded", "msg", readyCondition.Message)
} }
// update status // update status
timeNew := metav1.Now() readyCondition.LastTransitionTime = metav1.Now()
readyCondition.LastTransitionTime = &timeNew repo.Status.Conditions = []sourcev1.SourceCondition{readyCondition}
repo.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition}
if err := r.Status().Update(ctx, &repo); err != nil { if err := r.Status().Update(ctx, &repo); err != nil {
log.Error(err, "unable to update GitRepository status") log.Error(err, "unable to update GitRepository status")
@ -117,14 +111,13 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(predicate.Funcs{ WithEventFilter(predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool { DeleteFunc: func(e event.DeleteEvent) bool {
// delete artifacts // delete artifacts
repoDir := filepath.Join(r.StoragePath, artifact := r.Storage.ArtifactFor(r.Kind, e.Meta, "dummy")
fmt.Sprintf("repositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace())) if err := r.Storage.RemoveAll(artifact); err != nil {
if err := os.RemoveAll(repoDir); err != nil {
r.Log.Error(err, "unable to delete artifacts", r.Log.Error(err, "unable to delete artifacts",
"gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} else { } else {
r.Log.Info("Repository artifacts deleted", r.Log.Info("Repository artifacts deleted",
"gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} }
return false return false
}, },
@ -132,69 +125,49 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r) Complete(r)
} }
func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.RepositoryCondition, string, error) { func (r *GitRepositoryReconciler) sync(repository sourcev1.GitRepository) (sourcev1.SourceCondition, string, error) {
// determine ref // determine ref
refName := plumbing.NewBranchReferenceName("master") refName := plumbing.NewBranchReferenceName("master")
if gr.Spec.Branch != "" { if repository.Spec.Branch != "" {
refName = plumbing.NewBranchReferenceName(gr.Spec.Branch) refName = plumbing.NewBranchReferenceName(repository.Spec.Branch)
} }
if gr.Spec.Tag != "" { if repository.Spec.Tag != "" {
refName = plumbing.NewTagReferenceName(gr.Spec.Tag) refName = plumbing.NewTagReferenceName(repository.Spec.Tag)
} }
// create tmp dir // create tmp dir
dir, err := ioutil.TempDir("", gr.Name) dir, err := ioutil.TempDir("", repository.Name)
if err != nil { if err != nil {
ex := fmt.Errorf("tmp dir error %w", err) err = fmt.Errorf("tmp dir error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "ExecFailed",
Message: ex.Error(),
}, "", ex
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
// clone to tmp // clone to tmp
repo, err := git.PlainClone(dir, false, &git.CloneOptions{ repo, err := git.PlainClone(dir, false, &git.CloneOptions{
URL: gr.Spec.Url, URL: repository.Spec.URL,
Depth: 2, Depth: 2,
ReferenceName: refName, ReferenceName: refName,
SingleBranch: true, SingleBranch: true,
Tags: git.AllTags, Tags: git.AllTags,
}) })
if err != nil { if err != nil {
ex := fmt.Errorf("git clone error %w", err) err = fmt.Errorf("git clone error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCloneFailed",
Message: ex.Error(),
}, "", ex
} }
// checkout tag based on semver expression // checkout tag based on semver expression
if gr.Spec.SemVer != "" { if repository.Spec.SemVer != "" {
rng, err := semver.ParseRange(gr.Spec.SemVer) rng, err := semver.ParseRange(repository.Spec.SemVer)
if err != nil { if err != nil {
ex := fmt.Errorf("semver parse range error %w", err) err = fmt.Errorf("semver parse range error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCloneFailed",
Message: ex.Error(),
}, "", ex
} }
repoTags, err := repo.Tags() repoTags, err := repo.Tags()
if err != nil { if err != nil {
ex := fmt.Errorf("git list tags error %w", err) err = fmt.Errorf("git list tags error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCloneFailed",
Message: ex.Error(),
}, "", ex
} }
tags := make(map[string]string) tags := make(map[string]string)
@ -221,112 +194,92 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Repo
w, err := repo.Worktree() w, err := repo.Worktree()
if err != nil { if err != nil {
ex := fmt.Errorf("git worktree error %w", err) err = fmt.Errorf("git worktree error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCheckoutFailed",
Message: ex.Error(),
}, "", ex
} }
err = w.Checkout(&git.CheckoutOptions{ err = w.Checkout(&git.CheckoutOptions{
Hash: plumbing.NewHash(commit), Hash: plumbing.NewHash(commit),
}) })
if err != nil { if err != nil {
ex := fmt.Errorf("git checkout error %w", err) err = fmt.Errorf("git checkout error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCheckoutFailed",
Message: ex.Error(),
}, "", ex
} }
} else { } else {
ex := fmt.Errorf("no match found for semver %s", gr.Spec.SemVer) err = fmt.Errorf("no match found for semver %s", repository.Spec.SemVer)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitCheckoutFailed",
Message: ex.Error(),
}, "", ex
} }
} }
// read commit hash // read commit hash
ref, err := repo.Head() ref, err := repo.Head()
if err != nil { if err != nil {
ex := fmt.Errorf("git resolve HEAD error %w", err) err = fmt.Errorf("git resolve HEAD error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "GitHeadFailed",
Message: ex.Error(),
}, "", ex
} }
// create artifacts dir artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(),
repoDir := fmt.Sprintf("repositories/%s-%s", gr.Name, gr.Namespace) fmt.Sprintf("%s.tar.gz", ref.Hash().String()))
storage := filepath.Join(r.StoragePath, repoDir)
err = os.MkdirAll(storage, 0777) // create artifact dir
err = r.Storage.MkdirAll(artifact)
if err != nil { if err != nil {
ex := fmt.Errorf("mkdir dir error %w", err) err = fmt.Errorf("mkdir dir error %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: "ExecFailed",
Message: ex.Error(),
}, "", ex
} }
// store artifacts // acquire lock
artifacts := filepath.Join(storage, fmt.Sprintf("%s.tar.gz", ref.Hash().String())) unlock, err := r.Storage.Lock(artifact)
excludes := "--exclude=\\*.{jpg,jpeg,gif,png,wmv,flv,tar.gz,zip} --exclude .git"
command := exec.Command("/bin/sh", "-c",
fmt.Sprintf("cd %s && tar -c %s -f - . | gzip > %s", dir, excludes, artifacts))
err = command.Run()
if err != nil { if err != nil {
ex := fmt.Errorf("tar %s error %w", artifacts, err) err = fmt.Errorf("unable to acquire lock: %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady, }
Status: corev1.ConditionFalse, defer unlock()
Reason: "ExecFailed",
Message: ex.Error(), // archive artifact
}, "", ex err = r.Storage.Archive(artifact, dir, "")
if err != nil {
err = fmt.Errorf("storage error %w", err)
return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
} }
// compose artifacts URL message := fmt.Sprintf("Artifact is available at %s", artifact.Path)
hostname := "localhost" return ReadyCondition(sourcev1.GitOperationSucceedReason, message), artifact.URL, nil
if os.Getenv("RUNTIME_NAMESPACE") != "" {
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s",
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE"))
}
artifactsURL := fmt.Sprintf("http://%s/repositories/%s-%s/%s.tar.gz",
hostname, gr.Name, gr.Namespace, ref.Hash().String())
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionTrue,
Reason: "GitCloneSucceed",
Message: fmt.Sprintf("Fetched artifacts are available at %s", artifacts),
}, artifactsURL, nil
} }
func (r *GitRepositoryReconciler) shouldResetStatus(gr sourcev1.GitRepository) bool { func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) {
resetStatus := false resetStatus := false
if gr.Status.Artifacts != "" { if repository.Status.Artifact != "" {
pathParts := strings.Split(gr.Status.Artifacts, "/") parts := strings.Split(repository.Status.Artifact, "/")
path := fmt.Sprintf("repositories/%s-%s/%s", gr.Name, gr.Namespace, pathParts[len(pathParts)-1]) artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil { if !r.Storage.ArtifactExist(artifact) {
resetStatus = true resetStatus = true
} }
} }
// set initial status // set initial status
if len(gr.Status.Conditions) == 0 || resetStatus { if len(repository.Status.Conditions) == 0 || resetStatus {
resetStatus = true resetStatus = true
} }
return resetStatus return resetStatus, sourcev1.GitRepositoryStatus{
Conditions: []sourcev1.SourceCondition{
{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionUnknown,
Reason: sourcev1.InitializingReason,
LastTransitionTime: metav1.Now(),
},
},
}
}
func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) {
if repository.Status.Artifact != "" {
parts := strings.Split(repository.Status.Artifact, "/")
artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if err := r.Storage.RemoveAllButCurrent(artifact); err != nil {
r.Log.Info("Artifacts GC failed", "error", err)
}
}
} }

View File

@ -18,13 +18,10 @@ package controllers
import ( import (
"context" "context"
"crypto/sha1"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"os"
"path" "path"
"path/filepath"
"strings" "strings"
"time" "time"
@ -46,10 +43,11 @@ import (
// HelmRepositoryReconciler reconciles a HelmRepository object // HelmRepositoryReconciler reconciles a HelmRepository object
type HelmRepositoryReconciler struct { type HelmRepositoryReconciler struct {
client.Client client.Client
Log logr.Logger Log logr.Logger
Scheme *runtime.Scheme Scheme *runtime.Scheme
StoragePath string Storage *Storage
Getters getter.Providers Kind string
Getters getter.Providers
} }
// +kubebuilder:rbac:groups=source.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@ -69,21 +67,18 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
result := ctrl.Result{RequeueAfter: repository.Spec.Interval.Duration} result := ctrl.Result{RequeueAfter: repository.Spec.Interval.Duration}
// set initial status // set initial status
if r.shouldResetStatus(repository) { if reset, status := r.shouldResetStatus(repository); reset {
log.Info("Initialising repository") log.Info("Initializing repository")
repository.Status = sourcev1.HelmRepositoryStatus{} repository.Status = status
repository.Status.Conditions = []sourcev1.RepositoryCondition{
{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionUnknown,
},
}
if err := r.Status().Update(ctx, &repository); err != nil { if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status") log.Error(err, "unable to update HelmRepository status")
return result, err return result, err
} }
} }
// try to remove old artifacts
r.gc(repository)
// try to download index // try to download index
readyCondition, artifact, err := r.index(repository) readyCondition, artifact, err := r.index(repository)
if err != nil { if err != nil {
@ -99,9 +94,8 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
} }
// update status // update status
timeNew := metav1.Now() readyCondition.LastTransitionTime = metav1.Now()
readyCondition.LastTransitionTime = &timeNew repository.Status.Conditions = []sourcev1.SourceCondition{readyCondition}
repository.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition}
if err := r.Status().Update(ctx, &repository); err != nil { if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status") log.Error(err, "unable to update HelmRepository status")
@ -119,14 +113,13 @@ func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(predicate.Funcs{ WithEventFilter(predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool { DeleteFunc: func(e event.DeleteEvent) bool {
// delete artifacts // delete artifacts
repoDir := filepath.Join(r.StoragePath, artifact := r.Storage.ArtifactFor(r.Kind, e.Meta, "dummy")
fmt.Sprintf("helmrepositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace())) if err := r.Storage.RemoveAll(artifact); err != nil {
if err := os.RemoveAll(repoDir); err != nil {
r.Log.Error(err, "unable to delete artifacts", r.Log.Error(err, "unable to delete artifacts",
"helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} else { } else {
r.Log.Info("Helm repository artifacts deleted", r.Log.Info("Repository artifacts deleted",
"helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} }
return false return false
}, },
@ -134,25 +127,15 @@ func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r) Complete(r)
} }
func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (sourcev1.RepositoryCondition, string, error) { func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (sourcev1.SourceCondition, string, error) {
u, err := url.Parse(repository.Spec.URL) u, err := url.Parse(repository.Spec.URL)
if err != nil { if err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.URLInvalidReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.InvalidHelmRepositoryURLReason,
Message: err.Error(),
}, "", err
} }
c, err := r.Getters.ByScheme(u.Scheme) c, err := r.Getters.ByScheme(u.Scheme)
if err != nil { if err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.URLInvalidReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.InvalidHelmRepositoryURLReason,
Message: err.Error(),
}, "", err
} }
u.RawPath = path.Join(u.RawPath, "index.yaml") u.RawPath = path.Join(u.RawPath, "index.yaml")
@ -162,96 +145,60 @@ func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (so
// TODO(hidde): add authentication config // TODO(hidde): add authentication config
res, err := c.Get(indexURL, getter.WithURL(repository.Spec.URL)) res, err := c.Get(indexURL, getter.WithURL(repository.Spec.URL))
if err != nil { if err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
index, err := ioutil.ReadAll(res) data, err := ioutil.ReadAll(res)
if err != nil { if err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
i := &repo.IndexFile{} i := &repo.IndexFile{}
if err := yaml.Unmarshal(index, i); err != nil { if err := yaml.Unmarshal(data, i); err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
b, err := yaml.Marshal(i) index, err := yaml.Marshal(i)
if err != nil { if err != nil {
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
repoPath := fmt.Sprintf("helmrepositories/%s-%s", repository.Name, repository.Namespace) sum := r.Storage.Checksum(index)
storage := filepath.Join(r.StoragePath, repoPath) artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(),
sum := checksum(b) fmt.Sprintf("index-%s.yaml", sum))
indexFileName := fmt.Sprintf("index-%s.yaml", sum)
indexFilePath := filepath.Join(storage, indexFileName)
artifactsURL := fmt.Sprintf("http://%s/helmrepositories/%s/%s", host(), repoPath, indexFileName)
if file, err := os.Stat(indexFilePath); !os.IsNotExist(err) && !file.IsDir() { // create artifact dir
if fb, err := ioutil.ReadFile(indexFilePath); err == nil && sum == checksum(fb) { err = r.Storage.MkdirAll(artifact)
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionTrue,
Reason: "GitCloneSucceed",
Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath),
}, artifactsURL, nil
}
}
err = os.MkdirAll(storage, 0755)
if err != nil { if err != nil {
err = fmt.Errorf("unable to create repository index directory: %w", err) err = fmt.Errorf("unable to create repository index directory: %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
err = ioutil.WriteFile(indexFilePath, index, 0644)
// acquire lock
unlock, err := r.Storage.Lock(artifact)
if err != nil {
err = fmt.Errorf("unable to acquire lock: %w", err)
return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
}
defer unlock()
// save artifact to storage
err = r.Storage.WriteFile(artifact, index)
if err != nil { if err != nil {
err = fmt.Errorf("unable to write repository index file: %w", err) err = fmt.Errorf("unable to write repository index file: %w", err)
return sourcev1.RepositoryCondition{ return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
} }
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady, message := fmt.Sprintf("Artifact is available at %s", artifact.Path)
Status: corev1.ConditionTrue, return ReadyCondition(sourcev1.IndexationSucceededReason, message), artifact.URL, nil
Reason: sourcev1.IndexFetchSucceededReason,
Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath),
}, artifactsURL, nil
} }
func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) bool { func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) (bool, sourcev1.HelmRepositoryStatus) {
resetStatus := false resetStatus := false
if repository.Status.Artifact != "" { if repository.Status.Artifact != "" {
pathParts := strings.Split(repository.Status.Artifact, "/") parts := strings.Split(repository.Status.Artifact, "/")
path := fmt.Sprintf("helmrepositories/%s-%s/%s", repository.Name, repository.Namespace, pathParts[len(pathParts)-1]) artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil { if !r.Storage.ArtifactExist(artifact) {
resetStatus = true resetStatus = true
} }
} }
@ -261,20 +208,24 @@ func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRep
resetStatus = true resetStatus = true
} }
return resetStatus return resetStatus, sourcev1.HelmRepositoryStatus{
} Conditions: []sourcev1.SourceCondition{
{
// Checksum returns the SHA1 checksum for the given bytes as a string. Type: sourcev1.ReadyCondition,
func checksum(b []byte) string { Status: corev1.ConditionUnknown,
return fmt.Sprintf("%x", sha1.Sum(b)) Reason: sourcev1.InitializingReason,
} LastTransitionTime: metav1.Now(),
},
func host() string { },
hostname := "localhost" }
if os.Getenv("RUNTIME_NAMESPACE") != "" { }
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s", func (r *HelmRepositoryReconciler) gc(repository sourcev1.HelmRepository) {
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE")) if repository.Status.Artifact != "" {
parts := strings.Split(repository.Status.Artifact, "/")
artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if err := r.Storage.RemoveAllButCurrent(artifact); err != nil {
r.Log.Info("Artifacts GC failed", "error", err)
}
} }
return hostname
} }

View File

@ -1,3 +1,19 @@
/*
Copyright 2020 The Flux CD contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers package controllers
import ( import (

161
controllers/storage.go Normal file
View File

@ -0,0 +1,161 @@
/*
Copyright 2020 The Flux CD contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"crypto/sha1"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/source-controller/internal/lockedfile"
)
// Storage manages artifacts
type Storage struct {
// BasePath is the local directory path where the source artifacts are stored.
BasePath string `json:"basePath"`
// Hostname is the file server host name used to compose the artifacts URIs.
Hostname string `json:"hostname"`
// Timeout for artifacts operations
Timeout time.Duration `json:"timeout"`
}
// Artifact represents the output of a source synchronisation
type Artifact struct {
// Path is the local file path of this artifact
Path string `json:"path"`
// URL is the HTTP address of this artifact
URL string `json:"url"`
}
// NewStorage creates the storage helper for a given path and hostname
func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) {
if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() {
return nil, fmt.Errorf("invalid dir path %s", basePath)
}
return &Storage{
BasePath: basePath,
Hostname: hostname,
Timeout: timeout,
}, nil
}
// ArtifactFor returns an artifact for the given Kubernetes object
func (s *Storage) ArtifactFor(kind string, metadata metav1.Object, fileName string) Artifact {
path := fmt.Sprintf("%s/%s-%s/%s", kind, metadata.GetName(), metadata.GetNamespace(), fileName)
localPath := filepath.Join(s.BasePath, path)
url := fmt.Sprintf("http://%s/%s", s.Hostname, path)
return Artifact{
Path: localPath,
URL: url,
}
}
// MkdirAll calls os.MkdirAll for the given artifact base dir
func (s *Storage) MkdirAll(artifact Artifact) error {
dir := filepath.Dir(artifact.Path)
return os.MkdirAll(dir, 0777)
}
// RemoveAll calls os.RemoveAll for the given artifact base dir
func (s *Storage) RemoveAll(artifact Artifact) error {
dir := filepath.Dir(artifact.Path)
return os.RemoveAll(dir)
}
// RemoveAllButCurrent removes all files for the given artifact base dir excluding the current one
func (s *Storage) RemoveAllButCurrent(artifact Artifact) error {
dir := filepath.Dir(artifact.Path)
errors := []string{}
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if path != artifact.Path && !info.IsDir() {
if err := os.Remove(path); err != nil {
errors = append(errors, info.Name())
}
}
return nil
})
if len(errors) > 0 {
return fmt.Errorf("faild to remove files: %s", strings.Join(errors, " "))
}
return nil
}
// ArtifactExist returns a boolean indicating whether the artifact file exists in storage
func (s *Storage) ArtifactExist(artifact Artifact) bool {
if _, err := os.Stat(artifact.Path); os.IsNotExist(err) {
return false
}
return true
}
// Archive creates a tar.gz to the artifact path from the given dir excluding the provided file extensions
func (s *Storage) Archive(artifact Artifact, dir string, excludes string) error {
if excludes == "" {
excludes = "jpg,jpeg,gif,png,wmv,flv,tar.gz,zip"
}
ctx, cancel := context.WithTimeout(context.Background(), s.Timeout)
defer cancel()
tarExcludes := fmt.Sprintf("--exclude=\\*.{%s} --exclude .git", excludes)
cmd := fmt.Sprintf("cd %s && tar -c %s -f - . | gzip > %s", dir, tarExcludes, artifact.Path)
command := exec.CommandContext(ctx, "/bin/sh", "-c", cmd)
err := command.Run()
if err != nil {
return fmt.Errorf("command '%s' failed: %w", cmd, err)
}
return nil
}
// WriteFile writes the given bytes to the artifact path if the checksum differs
func (s *Storage) WriteFile(artifact Artifact, data []byte) error {
sum := s.Checksum(data)
if file, err := os.Stat(artifact.Path); !os.IsNotExist(err) && !file.IsDir() {
if fb, err := ioutil.ReadFile(artifact.Path); err == nil && sum == s.Checksum(fb) {
return nil
}
}
return ioutil.WriteFile(artifact.Path, data, 0644)
}
// Checksum returns the SHA1 checksum for the given bytes as a string
func (s *Storage) Checksum(b []byte) string {
return fmt.Sprintf("%x", sha1.Sum(b))
}
func (s *Storage) Lock(artifact Artifact) (unlock func(), err error) {
lockFile := artifact.Path + ".lock"
mutex := lockedfile.MutexAt(lockFile)
return mutex.Lock()
}

View File

@ -0,0 +1,98 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package filelock provides a platform-independent API for advisory file
// locking. Calls to functions in this package on platforms that do not support
// advisory locks will return errors for which IsNotSupported returns true.
package filelock
import (
"errors"
"os"
)
// A File provides the minimal set of methods required to lock an open file.
// File implementations must be usable as map keys.
// The usual implementation is *os.File.
type File interface {
// Name returns the name of the file.
Name() string
// Fd returns a valid file descriptor.
// (If the File is an *os.File, it must not be closed.)
Fd() uintptr
// Stat returns the FileInfo structure describing file.
Stat() (os.FileInfo, error)
}
// Lock places an advisory write lock on the file, blocking until it can be
// locked.
//
// If Lock returns nil, no other process will be able to place a read or write
// lock on the file until this process exits, closes f, or calls Unlock on it.
//
// If f's descriptor is already read- or write-locked, the behavior of Lock is
// unspecified.
//
// Closing the file may or may not release the lock promptly. Callers should
// ensure that Unlock is always called when Lock succeeds.
func Lock(f File) error {
return lock(f, writeLock)
}
// RLock places an advisory read lock on the file, blocking until it can be locked.
//
// If RLock returns nil, no other process will be able to place a write lock on
// the file until this process exits, closes f, or calls Unlock on it.
//
// If f is already read- or write-locked, the behavior of RLock is unspecified.
//
// Closing the file may or may not release the lock promptly. Callers should
// ensure that Unlock is always called if RLock succeeds.
func RLock(f File) error {
return lock(f, readLock)
}
// Unlock removes an advisory lock placed on f by this process.
//
// The caller must not attempt to unlock a file that is not locked.
func Unlock(f File) error {
return unlock(f)
}
// String returns the name of the function corresponding to lt
// (Lock, RLock, or Unlock).
func (lt lockType) String() string {
switch lt {
case readLock:
return "RLock"
case writeLock:
return "Lock"
default:
return "Unlock"
}
}
// IsNotSupported returns a boolean indicating whether the error is known to
// report that a function is not supported (possibly for a specific input).
// It is satisfied by ErrNotSupported as well as some syscall errors.
func IsNotSupported(err error) bool {
return isNotSupported(underlyingError(err))
}
var ErrNotSupported = errors.New("operation not supported")
// underlyingError returns the underlying error for known os error types.
func underlyingError(err error) error {
switch err := err.(type) {
case *os.PathError:
return err.Err
case *os.LinkError:
return err.Err
case *os.SyscallError:
return err.Err
}
return err
}

View File

@ -0,0 +1,44 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin dragonfly freebsd linux netbsd openbsd
package filelock
import (
"os"
"syscall"
)
type lockType int16
const (
readLock lockType = syscall.LOCK_SH
writeLock lockType = syscall.LOCK_EX
)
func lock(f File, lt lockType) (err error) {
for {
err = syscall.Flock(int(f.Fd()), int(lt))
if err != syscall.EINTR {
break
}
}
if err != nil {
return &os.PathError{
Op: lt.String(),
Path: f.Name(),
Err: err,
}
}
return nil
}
func unlock(f File) error {
return lock(f, syscall.LOCK_UN)
}
func isNotSupported(err error) bool {
return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported
}

187
internal/lockedfile/lockedfile.go Executable file
View File

@ -0,0 +1,187 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package lockedfile creates and manipulates files whose contents should only
// change atomically.
package lockedfile
import (
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
)
// A File is a locked *os.File.
//
// Closing the file releases the lock.
//
// If the program exits while a file is locked, the operating system releases
// the lock but may not do so promptly: callers must ensure that all locked
// files are closed before exiting.
type File struct {
osFile
closed bool
}
// osFile embeds a *os.File while keeping the pointer itself unexported.
// (When we close a File, it must be the same file descriptor that we opened!)
type osFile struct {
*os.File
}
// OpenFile is like os.OpenFile, but returns a locked file.
// If flag includes os.O_WRONLY or os.O_RDWR, the file is write-locked;
// otherwise, it is read-locked.
func OpenFile(name string, flag int, perm os.FileMode) (*File, error) {
var (
f = new(File)
err error
)
f.osFile.File, err = openFile(name, flag, perm)
if err != nil {
return nil, err
}
// Although the operating system will drop locks for open files when the go
// command exits, we want to hold locks for as little time as possible, and we
// especially don't want to leave a file locked after we're done with it. Our
// Close method is what releases the locks, so use a finalizer to report
// missing Close calls on a best-effort basis.
runtime.SetFinalizer(f, func(f *File) {
panic(fmt.Sprintf("lockedfile.File %s became unreachable without a call to Close", f.Name()))
})
return f, nil
}
// Open is like os.Open, but returns a read-locked file.
func Open(name string) (*File, error) {
return OpenFile(name, os.O_RDONLY, 0)
}
// Create is like os.Create, but returns a write-locked file.
func Create(name string) (*File, error) {
return OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
}
// Edit creates the named file with mode 0666 (before umask),
// but does not truncate existing contents.
//
// If Edit succeeds, methods on the returned File can be used for I/O.
// The associated file descriptor has mode O_RDWR and the file is write-locked.
func Edit(name string) (*File, error) {
return OpenFile(name, os.O_RDWR|os.O_CREATE, 0666)
}
// Close unlocks and closes the underlying file.
//
// Close may be called multiple times; all calls after the first will return a
// non-nil error.
func (f *File) Close() error {
if f.closed {
return &os.PathError{
Op: "close",
Path: f.Name(),
Err: os.ErrClosed,
}
}
f.closed = true
err := closeFile(f.osFile.File)
runtime.SetFinalizer(f, nil)
return err
}
// Read opens the named file with a read-lock and returns its contents.
func Read(name string) ([]byte, error) {
f, err := Open(name)
if err != nil {
return nil, err
}
defer f.Close()
return ioutil.ReadAll(f)
}
// Write opens the named file (creating it with the given permissions if needed),
// then write-locks it and overwrites it with the given content.
func Write(name string, content io.Reader, perm os.FileMode) (err error) {
f, err := OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
_, err = io.Copy(f, content)
if closeErr := f.Close(); err == nil {
err = closeErr
}
return err
}
// Transform invokes t with the result of reading the named file, with its lock
// still held.
//
// If t returns a nil error, Transform then writes the returned contents back to
// the file, making a best effort to preserve existing contents on error.
//
// t must not modify the slice passed to it.
func Transform(name string, t func([]byte) ([]byte, error)) (err error) {
f, err := Edit(name)
if err != nil {
return err
}
defer f.Close()
old, err := ioutil.ReadAll(f)
if err != nil {
return err
}
new, err := t(old)
if err != nil {
return err
}
if len(new) > len(old) {
// The overall file size is increasing, so write the tail first: if we're
// about to run out of space on the disk, we would rather detect that
// failure before we have overwritten the original contents.
if _, err := f.WriteAt(new[len(old):], int64(len(old))); err != nil {
// Make a best effort to remove the incomplete tail.
f.Truncate(int64(len(old)))
return err
}
}
// We're about to overwrite the old contents. In case of failure, make a best
// effort to roll back before we close the file.
defer func() {
if err != nil {
if _, err := f.WriteAt(old, 0); err == nil {
f.Truncate(int64(len(old)))
}
}
}()
if len(new) >= len(old) {
if _, err := f.WriteAt(new[:len(old)], 0); err != nil {
return err
}
} else {
if _, err := f.WriteAt(new, 0); err != nil {
return err
}
// The overall file size is decreasing, so shrink the file to its final size
// after writing. We do this after writing (instead of before) so that if
// the write fails, enough filesystem space will likely still be reserved
// to contain the previous contents.
if err := f.Truncate(int64(len(new))); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,64 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !plan9
package lockedfile
import (
"os"
"github.com/fluxcd/source-controller/internal/lockedfile/internal/filelock"
)
func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
// On BSD systems, we could add the O_SHLOCK or O_EXLOCK flag to the OpenFile
// call instead of locking separately, but we have to support separate locking
// calls for Linux and Windows anyway, so it's simpler to use that approach
// consistently.
f, err := os.OpenFile(name, flag&^os.O_TRUNC, perm)
if err != nil {
return nil, err
}
switch flag & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) {
case os.O_WRONLY, os.O_RDWR:
err = filelock.Lock(f)
default:
err = filelock.RLock(f)
}
if err != nil {
f.Close()
return nil, err
}
if flag&os.O_TRUNC == os.O_TRUNC {
if err := f.Truncate(0); err != nil {
// The documentation for os.O_TRUNC says “if possible, truncate file when
// opened”, but doesn't define “possible” (golang.org/issue/28699).
// We'll treat regular files (and symlinks to regular files) as “possible”
// and ignore errors for the rest.
if fi, statErr := f.Stat(); statErr != nil || fi.Mode().IsRegular() {
filelock.Unlock(f)
f.Close()
return nil, err
}
}
}
return f, nil
}
func closeFile(f *os.File) error {
// Since locking syscalls operate on file descriptors, we must unlock the file
// while the descriptor is still valid — that is, before the file is closed —
// and avoid unlocking files that are already closed.
err := filelock.Unlock(f)
if closeErr := f.Close(); err == nil {
err = closeErr
}
return err
}

67
internal/lockedfile/mutex.go Executable file
View File

@ -0,0 +1,67 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package lockedfile
import (
"fmt"
"os"
"sync"
)
// A Mutex provides mutual exclusion within and across processes by locking a
// well-known file. Such a file generally guards some other part of the
// filesystem: for example, a Mutex file in a directory might guard access to
// the entire tree rooted in that directory.
//
// Mutex does not implement sync.Locker: unlike a sync.Mutex, a lockedfile.Mutex
// can fail to lock (e.g. if there is a permission error in the filesystem).
//
// Like a sync.Mutex, a Mutex may be included as a field of a larger struct but
// must not be copied after first use. The Path field must be set before first
// use and must not be change thereafter.
type Mutex struct {
Path string // The path to the well-known lock file. Must be non-empty.
mu sync.Mutex // A redundant mutex. The race detector doesn't know about file locking, so in tests we may need to lock something that it understands.
}
// MutexAt returns a new Mutex with Path set to the given non-empty path.
func MutexAt(path string) *Mutex {
if path == "" {
panic("lockedfile.MutexAt: path must be non-empty")
}
return &Mutex{Path: path}
}
func (mu *Mutex) String() string {
return fmt.Sprintf("lockedfile.Mutex(%s)", mu.Path)
}
// Lock attempts to lock the Mutex.
//
// If successful, Lock returns a non-nil unlock function: it is provided as a
// return-value instead of a separate method to remind the caller to check the
// accompanying error. (See https://golang.org/issue/20803.)
func (mu *Mutex) Lock() (unlock func(), err error) {
if mu.Path == "" {
panic("lockedfile.Mutex: missing Path during Lock")
}
// We could use either O_RDWR or O_WRONLY here. If we choose O_RDWR and the
// file at mu.Path is write-only, the call to OpenFile will fail with a
// permission error. That's actually what we want: if we add an RLock method
// in the future, it should call OpenFile with O_RDONLY and will require the
// files must be readable, so we should not let the caller make any
// assumptions about Mutex working with write-only files.
f, err := OpenFile(mu.Path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
mu.mu.Lock()
return func() {
mu.mu.Unlock()
f.Close()
}, nil
}

60
main.go
View File

@ -18,20 +18,22 @@ package main
import ( import (
"flag" "flag"
"github.com/go-logr/logr" "fmt"
"helm.sh/helm/v3/pkg/getter"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/getter"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/log/zap"
sourcev1alpha1 "github.com/fluxcd/source-controller/api/v1alpha1" sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
"github.com/fluxcd/source-controller/controllers" "github.com/fluxcd/source-controller/controllers"
// +kubebuilder:scaffold:imports // +kubebuilder:scaffold:imports
) )
@ -44,7 +46,7 @@ var (
func init() { func init() {
_ = clientgoscheme.AddToScheme(scheme) _ = clientgoscheme.AddToScheme(scheme)
_ = sourcev1alpha1.AddToScheme(scheme) _ = sourcev1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme // +kubebuilder:scaffold:scheme
} }
@ -76,26 +78,26 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if storagePath == "" { storage := mustInitStorage(storagePath, setupLog)
p, _ := os.Getwd()
storagePath = filepath.Join(p, "bin") go startFileServer(storage.BasePath, storageAddr, setupLog)
}
go startFileServer(storagePath, storageAddr, setupLog)
if err = (&controllers.GitRepositoryReconciler{ if err = (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"), Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
Scheme: mgr.GetScheme(), Scheme: mgr.GetScheme(),
StoragePath: storagePath, Kind: "gitrepository",
Storage: storage,
}).SetupWithManager(mgr); err != nil { }).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "GitRepository") setupLog.Error(err, "unable to create controller", "controller", "GitRepository")
os.Exit(1) os.Exit(1)
} }
if err = (&controllers.HelmRepositoryReconciler{ if err = (&controllers.HelmRepositoryReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("HelmRepository"), Log: ctrl.Log.WithName("controllers").WithName("HelmRepository"),
Scheme: mgr.GetScheme(), Scheme: mgr.GetScheme(),
StoragePath: storagePath, Kind: "helmrepository",
Storage: storage,
Getters: getter.Providers{ Getters: getter.Providers{
getter.Provider{ getter.Provider{
Schemes: []string{"http", "https"}, Schemes: []string{"http", "https"},
@ -123,3 +125,25 @@ func startFileServer(path string, address string, l logr.Logger) {
l.Error(err, "file server error") l.Error(err, "file server error")
} }
} }
func mustInitStorage(path string, l logr.Logger) *controllers.Storage {
if path == "" {
p, _ := os.Getwd()
path = filepath.Join(p, "bin")
}
hostname := "localhost"
if os.Getenv("RUNTIME_NAMESPACE") != "" {
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s",
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE"))
}
storage, err := controllers.NewStorage(path, hostname, 5*time.Minute)
if err != nil {
l.Error(err, "unable to initialise storage")
os.Exit(1)
}
return storage
}