mirror of https://github.com/rancher/gitjob.git
409 lines
12 KiB
Go
409 lines
12 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/go-logr/logr"
|
|
|
|
v1 "github.com/rancher/gitjob/pkg/apis/gitjob.cattle.io/v1"
|
|
"github.com/rancher/wrangler/v2/pkg/condition"
|
|
"github.com/rancher/wrangler/v2/pkg/kstatus"
|
|
"github.com/rancher/wrangler/v2/pkg/name"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/util/retry"
|
|
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
)
|
|
|
|
const (
|
|
bundleCAVolumeName = "additional-ca"
|
|
bundleCAFile = "additional-ca.crt"
|
|
gitCredentialVolumeName = "git-credential" // #nosec G101 this is not a credential
|
|
gitClonerVolumeName = "git-cloner"
|
|
emptyDirVolumeName = "git-cloner-empty-dir"
|
|
)
|
|
|
|
type GitPoller interface {
|
|
AddOrModifyGitRepoWatch(ctx context.Context, gitJob v1.GitJob)
|
|
CleanUpWatches(ctx context.Context)
|
|
}
|
|
|
|
// CronJobReconciler reconciles a GitJob object
|
|
type GitJobReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
Image string
|
|
GitPoller GitPoller
|
|
Log logr.Logger
|
|
}
|
|
|
|
func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&v1.GitJob{}).
|
|
WithEventFilter(generationOrCommitChangedPredicate()).
|
|
Owns(&batchv1.Job{}).
|
|
Complete(r)
|
|
}
|
|
|
|
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
|
// move the current state of the cluster closer to the desired state.
|
|
// The Reconcile function compares the state specified by
|
|
// the GitJob object against the actual cluster state, and then
|
|
// performs operations to make the cluster state reflect the state specified by
|
|
// the user.
|
|
//
|
|
// For more details, check Reconcile and its Result here:
|
|
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
|
|
func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
var gitJob v1.GitJob
|
|
|
|
if err := r.Get(ctx, req.NamespacedName, &gitJob); err != nil && !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, err
|
|
} else if errors.IsNotFound(err) {
|
|
r.GitPoller.CleanUpWatches(ctx)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
r.GitPoller.AddOrModifyGitRepoWatch(ctx, gitJob)
|
|
|
|
var job batchv1.Job
|
|
err := r.Get(ctx, types.NamespacedName{
|
|
Namespace: gitJob.Namespace,
|
|
Name: jobName(&gitJob),
|
|
}, &job)
|
|
if err != nil && !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, fmt.Errorf("error retrieving gitJob: %v", err)
|
|
}
|
|
|
|
if errors.IsNotFound(err) && gitJob.Status.Commit != "" {
|
|
if err := r.createJob(ctx, &gitJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating job: %v", err)
|
|
}
|
|
} else if gitJob.Status.Commit != "" {
|
|
if err = r.deleteJobIfNeeded(ctx, &gitJob, &job); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error deleting job: %v", err)
|
|
}
|
|
}
|
|
|
|
if err = r.updateStatus(ctx, &gitJob, &job); err != nil {
|
|
if errors.IsConflict(err) {
|
|
r.Log.Info("conflict updating status", "message", err)
|
|
return ctrl.Result{Requeue: true}, nil // just retry, but don't show an error
|
|
}
|
|
return ctrl.Result{}, fmt.Errorf("error updating gitjob status: %v", err)
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func generationOrCommitChangedPredicate() predicate.Predicate {
|
|
return predicate.Funcs{
|
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
|
oldGitJob, ok := e.ObjectOld.(*v1.GitJob)
|
|
if !ok {
|
|
return true
|
|
}
|
|
newGitJob, ok := e.ObjectNew.(*v1.GitJob)
|
|
if !ok {
|
|
return true
|
|
}
|
|
|
|
return oldGitJob.Generation != newGitJob.Generation || oldGitJob.Status.Commit != newGitJob.Status.Commit
|
|
},
|
|
}
|
|
}
|
|
|
|
func (r *GitJobReconciler) createJob(ctx context.Context, gitJob *v1.GitJob) error {
|
|
job, err := r.newJob(ctx, gitJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := controllerutil.SetControllerReference(gitJob, job, r.Scheme); err != nil {
|
|
return err
|
|
}
|
|
err = r.Create(ctx, job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
var gitJobFomCluster v1.GitJob
|
|
err := r.Get(ctx, types.NamespacedName{Name: gitJob.Name, Namespace: gitJob.Namespace}, &gitJobFomCluster)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
gitJobFomCluster.Status.ObservedGeneration = gitJobFomCluster.Generation
|
|
gitJobFomCluster.Status.LastSyncedTime = metav1.Now()
|
|
|
|
return r.Status().Update(ctx, &gitJobFomCluster)
|
|
})
|
|
|
|
}
|
|
|
|
func (r *GitJobReconciler) updateStatus(ctx context.Context, gitJob *v1.GitJob, job *batchv1.Job) error {
|
|
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
uJob := &unstructured.Unstructured{Object: obj}
|
|
result, err := status.Compute(uJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
gitJob.Status.JobStatus = result.Status.String()
|
|
for _, con := range result.Conditions {
|
|
condition.Cond(con.Type.String()).SetStatus(gitJob, string(con.Status))
|
|
condition.Cond(con.Type.String()).SetMessageIfBlank(gitJob, con.Message)
|
|
condition.Cond(con.Type.String()).Reason(gitJob, con.Reason)
|
|
}
|
|
|
|
if result.Status == status.FailedStatus {
|
|
selector := labels.SelectorFromSet(labels.Set{
|
|
"job-name": job.Name,
|
|
})
|
|
var podList corev1.PodList
|
|
err := r.Client.List(ctx, &podList, &client.ListOptions{LabelSelector: selector})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sort.Slice(podList.Items, func(i, j int) bool {
|
|
return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
|
|
})
|
|
terminationMessage := result.Message
|
|
if len(podList.Items) > 0 {
|
|
for _, podStatus := range podList.Items[len(podList.Items)-1].Status.ContainerStatuses {
|
|
if podStatus.Name != "step-git-source" && podStatus.State.Terminated != nil {
|
|
terminationMessage += podStatus.State.Terminated.Message
|
|
}
|
|
}
|
|
}
|
|
kstatus.SetError(gitJob, terminationMessage)
|
|
}
|
|
|
|
if result.Status == status.CurrentStatus {
|
|
if strings.Contains(result.Message, "Job Completed") {
|
|
gitJob.Status.LastExecutedCommit = job.Annotations["commit"]
|
|
}
|
|
kstatus.SetActive(gitJob)
|
|
}
|
|
|
|
gitJob.Status.ObservedGeneration = gitJob.Generation
|
|
gitJob.Status.LastSyncedTime = metav1.Now()
|
|
|
|
return r.Status().Update(ctx, gitJob)
|
|
}
|
|
|
|
func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitJob *v1.GitJob, job *batchv1.Job) error {
|
|
// if force delete is set, delete the job to make sure a new job is created
|
|
if gitJob.Spec.ForceUpdateGeneration != gitJob.Status.UpdateGeneration {
|
|
gitJob.Status.UpdateGeneration = gitJob.Spec.ForceUpdateGeneration
|
|
r.Log.Info("job deletion triggered because of ForceUpdateGeneration")
|
|
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// k8s Jobs are immutable. Recreate the job if the GitJob Spec has changed.
|
|
if gitJob.Generation != gitJob.Status.ObservedGeneration {
|
|
r.Log.Info("job deletion triggered because of generation change")
|
|
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func jobName(obj *v1.GitJob) string {
|
|
return name.SafeConcatName(obj.Name, name.Hex(obj.Spec.Git.Repo+obj.Status.Commit, 5))
|
|
}
|
|
|
|
func caBundleName(obj *v1.GitJob) string {
|
|
return fmt.Sprintf("%s-cabundle", obj.Name)
|
|
}
|
|
|
|
func (r *GitJobReconciler) newJob(ctx context.Context, obj *v1.GitJob) (*batchv1.Job, error) {
|
|
job := &batchv1.Job{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Annotations: map[string]string{
|
|
"generation": strconv.Itoa(int(obj.Generation)),
|
|
"commit": obj.Status.Commit,
|
|
},
|
|
Namespace: obj.Namespace,
|
|
Name: jobName(obj),
|
|
},
|
|
Spec: obj.Spec.JobSpec,
|
|
}
|
|
|
|
initContainer, err := r.generateInitContainer(ctx, obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
job.Spec.Template.Spec.InitContainers = []corev1.Container{initContainer}
|
|
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes,
|
|
corev1.Volume{
|
|
Name: gitClonerVolumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
EmptyDir: &corev1.EmptyDirVolumeSource{},
|
|
},
|
|
}, corev1.Volume{
|
|
Name: emptyDirVolumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
EmptyDir: &corev1.EmptyDirVolumeSource{},
|
|
},
|
|
},
|
|
)
|
|
|
|
if obj.Spec.Git.CABundle != nil {
|
|
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{
|
|
Name: bundleCAVolumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
Secret: &corev1.SecretVolumeSource{
|
|
SecretName: caBundleName(obj),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
if obj.Spec.Git.ClientSecretName != "" {
|
|
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes,
|
|
corev1.Volume{
|
|
Name: gitCredentialVolumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
Secret: &corev1.SecretVolumeSource{
|
|
SecretName: obj.Spec.Git.ClientSecretName,
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
|
|
for i := range job.Spec.Template.Spec.Containers {
|
|
job.Spec.Template.Spec.Containers[i].VolumeMounts = append(job.Spec.Template.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{
|
|
MountPath: "/workspace/source",
|
|
Name: gitClonerVolumeName,
|
|
})
|
|
job.Spec.Template.Spec.Containers[i].Env = append(job.Spec.Template.Spec.Containers[i].Env,
|
|
corev1.EnvVar{
|
|
Name: "COMMIT",
|
|
Value: obj.Status.Commit,
|
|
},
|
|
corev1.EnvVar{
|
|
Name: "EVENT_TYPE",
|
|
Value: obj.Status.Event,
|
|
},
|
|
)
|
|
job.Spec.Template.Spec.Containers[i].Env = append(job.Spec.Template.Spec.Containers[i].Env, proxyEnvVars()...)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
func (r *GitJobReconciler) generateInitContainer(ctx context.Context, obj *v1.GitJob) (corev1.Container, error) {
|
|
args := []string{obj.Spec.Git.Repo, "/workspace"}
|
|
volumeMounts := []corev1.VolumeMount{
|
|
{
|
|
Name: gitClonerVolumeName,
|
|
MountPath: "/workspace",
|
|
},
|
|
{
|
|
Name: emptyDirVolumeName,
|
|
MountPath: "/tmp",
|
|
},
|
|
}
|
|
if obj.Spec.Git.Branch != "" {
|
|
args = append(args, "--branch", obj.Spec.Git.Branch)
|
|
} else if obj.Spec.Git.Revision != "" {
|
|
args = append(args, "--revision", obj.Spec.Git.Revision)
|
|
}
|
|
|
|
if obj.Spec.Git.ClientSecretName != "" {
|
|
var secret corev1.Secret
|
|
if err := r.Get(ctx, types.NamespacedName{
|
|
Namespace: obj.Namespace,
|
|
Name: obj.Spec.Git.ClientSecretName,
|
|
}, &secret); err != nil {
|
|
return corev1.Container{}, err
|
|
}
|
|
|
|
if secret.Type == corev1.SecretTypeBasicAuth {
|
|
volumeMounts = append(volumeMounts, corev1.VolumeMount{
|
|
Name: gitCredentialVolumeName,
|
|
MountPath: "/gitjob/credentials",
|
|
})
|
|
args = append(args, "--username", string(secret.Data[corev1.BasicAuthUsernameKey]))
|
|
args = append(args, "--password-file", "/gitjob/credentials/"+corev1.BasicAuthPasswordKey)
|
|
} else if secret.Type == corev1.SecretTypeSSHAuth {
|
|
volumeMounts = append(volumeMounts, corev1.VolumeMount{
|
|
Name: gitCredentialVolumeName,
|
|
MountPath: "/gitjob/ssh",
|
|
})
|
|
args = append(args, "--ssh-private-key-file", "/gitjob/ssh/"+corev1.SSHAuthPrivateKey)
|
|
knownHosts := secret.Data["known_hosts"]
|
|
if knownHosts != nil {
|
|
args = append(args, "--known-hosts-file", "/gitjob/ssh/known_hosts")
|
|
}
|
|
}
|
|
}
|
|
|
|
if obj.Spec.Git.InsecureSkipTLSverify {
|
|
args = append(args, "--insecure-skip-tls")
|
|
}
|
|
if obj.Spec.Git.CABundle != nil {
|
|
volumeMounts = append(volumeMounts, corev1.VolumeMount{
|
|
Name: bundleCAVolumeName,
|
|
MountPath: "/gitjob/cabundle",
|
|
})
|
|
args = append(args, "--ca-bundle-file", "/gitjob/cabundle/"+bundleCAFile)
|
|
}
|
|
|
|
return corev1.Container{
|
|
Command: []string{
|
|
"gitcloner",
|
|
},
|
|
Args: args,
|
|
Image: r.Image,
|
|
Name: "gitcloner-initializer",
|
|
VolumeMounts: volumeMounts,
|
|
Env: proxyEnvVars(),
|
|
SecurityContext: &corev1.SecurityContext{
|
|
AllowPrivilegeEscalation: &[]bool{false}[0],
|
|
ReadOnlyRootFilesystem: &[]bool{true}[0],
|
|
Privileged: &[]bool{false}[0],
|
|
Capabilities: &corev1.Capabilities{Drop: []corev1.Capability{"ALL"}},
|
|
RunAsNonRoot: &[]bool{true}[0],
|
|
SeccompProfile: &corev1.SeccompProfile{
|
|
Type: corev1.SeccompProfileTypeRuntimeDefault,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func proxyEnvVars() []corev1.EnvVar {
|
|
var envVars []corev1.EnvVar
|
|
for _, envVar := range []string{"HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY"} {
|
|
if val, ok := os.LookupEnv(envVar); ok {
|
|
envVars = append(envVars, corev1.EnvVar{Name: envVar, Value: val})
|
|
}
|
|
}
|
|
|
|
return envVars
|
|
}
|