Refactor git controller fs operations

- use storage helper in GitRepositoryReconciler
- implement artifacts GC
- rename status artifacts to artifact
This commit is contained in:
stefanprodan 2020-04-10 12:01:06 +03:00
parent d1fb8e1ade
commit ce01399c15
4 changed files with 90 additions and 69 deletions

View File

@ -53,9 +53,9 @@ type GitRepositoryStatus struct {
// +optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
// Path to the artifacts of the last repository sync.
// Path to the artifact output of the last repository sync.
// +optional
Artifacts string `json:"artifacts,omitempty"`
Artifact string `json:"artifacts,omitempty"`
}
// +kubebuilder:object:root=true

View File

@ -73,7 +73,7 @@ spec:
description: GitRepositoryStatus defines the observed state of GitRepository
properties:
artifacts:
description: Path to the artifacts of the last repository sync.
description: Path to the artifact output of the last repository sync.
type: string
conditions:
items:

View File

@ -21,8 +21,6 @@ import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
@ -44,9 +42,10 @@ import (
// GitRepositoryReconciler reconciles a GitRepository object
type GitRepositoryReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
StoragePath string
Log logr.Logger
Scheme *runtime.Scheme
Storage *Storage
Kind string
}
// +kubebuilder:rbac:groups=source.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -56,7 +55,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
log := r.Log.WithValues("gitrepository", req.NamespacedName)
log := r.Log.WithValues(r.Kind, req.NamespacedName)
var repo sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repo); err != nil {
@ -75,16 +74,19 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
}
}
// try to remove old artifacts
r.gc(repo)
// try git clone
readyCondition, artifacts, err := r.sync(repo)
if err != nil {
log.Info("Repository sync failed", "error", err.Error())
} else {
// update artifacts if commit hash changed
if repo.Status.Artifacts != artifacts {
if repo.Status.Artifact != artifacts {
timeNew := metav1.Now()
repo.Status.LastUpdateTime = &timeNew
repo.Status.Artifacts = artifacts
repo.Status.Artifact = artifacts
}
log.Info("Repository sync succeeded", "msg", readyCondition.Message)
}
@ -109,14 +111,13 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
// delete artifacts
repoDir := filepath.Join(r.StoragePath,
fmt.Sprintf("repositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace()))
if err := os.RemoveAll(repoDir); err != nil {
artifact := r.Storage.ArtifactFor(r.Kind, e.Meta, "dummy")
if err := r.Storage.RemoveAll(artifact); err != nil {
r.Log.Error(err, "unable to delete artifacts",
"gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} else {
r.Log.Info("Repository artifacts deleted",
"gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
}
return false
},
@ -124,18 +125,18 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.SourceCondition, string, error) {
func (r *GitRepositoryReconciler) sync(repository sourcev1.GitRepository) (sourcev1.SourceCondition, string, error) {
// determine ref
refName := plumbing.NewBranchReferenceName("master")
if gr.Spec.Branch != "" {
refName = plumbing.NewBranchReferenceName(gr.Spec.Branch)
if repository.Spec.Branch != "" {
refName = plumbing.NewBranchReferenceName(repository.Spec.Branch)
}
if gr.Spec.Tag != "" {
refName = plumbing.NewTagReferenceName(gr.Spec.Tag)
if repository.Spec.Tag != "" {
refName = plumbing.NewTagReferenceName(repository.Spec.Tag)
}
// create tmp dir
dir, err := ioutil.TempDir("", gr.Name)
dir, err := ioutil.TempDir("", repository.Name)
if err != nil {
ex := fmt.Errorf("tmp dir error %w", err)
return sourcev1.SourceCondition{
@ -149,7 +150,7 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
// clone to tmp
repo, err := git.PlainClone(dir, false, &git.CloneOptions{
URL: gr.Spec.Url,
URL: repository.Spec.Url,
Depth: 2,
ReferenceName: refName,
SingleBranch: true,
@ -166,8 +167,8 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
}
// checkout tag based on semver expression
if gr.Spec.SemVer != "" {
rng, err := semver.ParseRange(gr.Spec.SemVer)
if repository.Spec.SemVer != "" {
rng, err := semver.ParseRange(repository.Spec.SemVer)
if err != nil {
ex := fmt.Errorf("semver parse range error %w", err)
return sourcev1.SourceCondition{
@ -235,7 +236,7 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
}, "", ex
}
} else {
ex := fmt.Errorf("no match found for semver %s", gr.Spec.SemVer)
ex := fmt.Errorf("no match found for semver %s", repository.Spec.SemVer)
return sourcev1.SourceCondition{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionFalse,
@ -257,10 +258,11 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
}, "", ex
}
// create artifacts dir
repoDir := fmt.Sprintf("repositories/%s-%s", gr.Name, gr.Namespace)
storage := filepath.Join(r.StoragePath, repoDir)
err = os.MkdirAll(storage, 0777)
artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(),
fmt.Sprintf("%s.tar.gz", ref.Hash().String()))
// create artifact dir
err = r.Storage.MkdirAll(artifact)
if err != nil {
ex := fmt.Errorf("mkdir dir error %w", err)
return sourcev1.SourceCondition{
@ -271,14 +273,10 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
}, "", ex
}
// store artifacts
artifacts := filepath.Join(storage, fmt.Sprintf("%s.tar.gz", ref.Hash().String()))
excludes := "--exclude=\\*.{jpg,jpeg,gif,png,wmv,flv,tar.gz,zip} --exclude .git"
command := exec.Command("/bin/sh", "-c",
fmt.Sprintf("cd %s && tar -c %s -f - . | gzip > %s", dir, excludes, artifacts))
err = command.Run()
// archive artifact
err = r.Storage.Archive(artifact, dir, "")
if err != nil {
ex := fmt.Errorf("tar %s error %w", artifacts, err)
ex := fmt.Errorf("storage error %w", err)
return sourcev1.SourceCondition{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionFalse,
@ -287,36 +285,26 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Sour
}, "", ex
}
// compose artifacts URL
hostname := "localhost"
if os.Getenv("RUNTIME_NAMESPACE") != "" {
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s",
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE"))
}
artifactsURL := fmt.Sprintf("http://%s/repositories/%s-%s/%s.tar.gz",
hostname, gr.Name, gr.Namespace, ref.Hash().String())
return sourcev1.SourceCondition{
Type: sourcev1.ReadyCondition,
Status: corev1.ConditionTrue,
Reason: "GitCloneSucceed",
Message: fmt.Sprintf("Fetched artifacts are available at %s", artifacts),
}, artifactsURL, nil
Message: fmt.Sprintf("Artifact is available at %s", artifact.Path),
}, artifact.URL, nil
}
func (r *GitRepositoryReconciler) shouldResetStatus(gr sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) {
func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) {
resetStatus := false
if gr.Status.Artifacts != "" {
pathParts := strings.Split(gr.Status.Artifacts, "/")
path := fmt.Sprintf("repositories/%s-%s/%s", gr.Name, gr.Namespace, pathParts[len(pathParts)-1])
if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil {
if repository.Status.Artifact != "" {
parts := strings.Split(repository.Status.Artifact, "/")
artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if !r.Storage.ArtifactExist(artifact) {
resetStatus = true
}
}
// set initial status
if len(gr.Status.Conditions) == 0 || resetStatus {
if len(repository.Status.Conditions) == 0 || resetStatus {
resetStatus = true
}
@ -331,3 +319,13 @@ func (r *GitRepositoryReconciler) shouldResetStatus(gr sourcev1.GitRepository) (
},
}
}
func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) {
if repository.Status.Artifact != "" {
parts := strings.Split(repository.Status.Artifact, "/")
artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1])
if err := r.Storage.RemoveAllButCurrent(artifact); err != nil {
r.Log.Info("Artifacts GC failed", "error", err)
}
}
}

51
main.go
View File

@ -18,20 +18,22 @@ package main
import (
"flag"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/getter"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/getter"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
sourcev1alpha1 "github.com/fluxcd/source-controller/api/v1alpha1"
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
"github.com/fluxcd/source-controller/controllers"
// +kubebuilder:scaffold:imports
)
@ -44,7 +46,7 @@ var (
func init() {
_ = clientgoscheme.AddToScheme(scheme)
_ = sourcev1alpha1.AddToScheme(scheme)
_ = sourcev1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme
}
@ -76,17 +78,16 @@ func main() {
os.Exit(1)
}
if storagePath == "" {
p, _ := os.Getwd()
storagePath = filepath.Join(p, "bin")
}
go startFileServer(storagePath, storageAddr, setupLog)
storage := mustInitStorage(storagePath, setupLog)
go startFileServer(storage.BasePath, storageAddr, setupLog)
if err = (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
Scheme: mgr.GetScheme(),
StoragePath: storagePath,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("GitRepository"),
Scheme: mgr.GetScheme(),
Kind: "gitrepository",
Storage: storage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "GitRepository")
os.Exit(1)
@ -123,3 +124,25 @@ func startFileServer(path string, address string, l logr.Logger) {
l.Error(err, "file server error")
}
}
func mustInitStorage(path string, l logr.Logger) *controllers.Storage {
if path == "" {
p, _ := os.Getwd()
path = filepath.Join(p, "bin")
}
hostname := "localhost"
if os.Getenv("RUNTIME_NAMESPACE") != "" {
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s",
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE"))
}
storage, err := controllers.NewStorage(path, hostname, 5*time.Minute)
if err != nil {
l.Error(err, "unable to initialise storage")
os.Exit(1)
}
return storage
}