579 lines
20 KiB
Go
579 lines
20 KiB
Go
package cron
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/Knetic/govaluate"
|
|
"github.com/robfig/cron/v3"
|
|
log "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/util/retry"
|
|
|
|
argoerrs "github.com/argoproj/argo-workflows/v3/errors"
|
|
|
|
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
|
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
|
|
typed "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
|
|
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
|
|
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
|
|
"github.com/argoproj/argo-workflows/v3/util/expr/argoexpr"
|
|
"github.com/argoproj/argo-workflows/v3/util/logging"
|
|
"github.com/argoproj/argo-workflows/v3/util/template"
|
|
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
|
|
"github.com/argoproj/argo-workflows/v3/workflow/common"
|
|
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
|
|
|
|
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
|
|
"github.com/argoproj/argo-workflows/v3/workflow/util"
|
|
"github.com/argoproj/argo-workflows/v3/workflow/validate"
|
|
)
|
|
|
|
const (
|
|
variablePrefix string = `cronworkflow`
|
|
)
|
|
|
|
type cronWfOperationCtx struct {
|
|
// CronWorkflow is the CronWorkflow to be run
|
|
name string
|
|
cronWf *v1alpha1.CronWorkflow
|
|
wfClientset versioned.Interface
|
|
wfClient typed.WorkflowInterface
|
|
wfDefaults *v1alpha1.Workflow
|
|
cronWfIf typed.CronWorkflowInterface
|
|
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
|
|
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
|
|
log logging.Logger
|
|
metrics *metrics.Metrics
|
|
// scheduledTimeFunc returns the last scheduled time when it is called
|
|
scheduledTimeFunc ScheduledTimeFunc
|
|
}
|
|
|
|
func newCronWfOperationCtx(ctx context.Context, cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface,
|
|
metrics *metrics.Metrics, wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer,
|
|
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer, wfDefaults *v1alpha1.Workflow,
|
|
) *cronWfOperationCtx {
|
|
log := logging.GetLoggerFromContext(ctx)
|
|
if log == nil {
|
|
log = logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat())
|
|
}
|
|
return &cronWfOperationCtx{
|
|
name: cronWorkflow.Name,
|
|
cronWf: cronWorkflow,
|
|
wfClientset: wfClientset,
|
|
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
|
|
wfDefaults: wfDefaults,
|
|
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
|
|
wftmplInformer: wftmplInformer,
|
|
cwftmplInformer: cwftmplInformer,
|
|
log: log.WithFields(logging.Fields{
|
|
"workflow": cronWorkflow.Name,
|
|
"namespace": cronWorkflow.Namespace,
|
|
}),
|
|
metrics: metrics,
|
|
// inferScheduledTime returns an inferred scheduled time based on the current time and only works if it is called
|
|
// within 59 seconds of the scheduled time. Here it acts as a placeholder until it is replaced by a similar
|
|
// function that returns the last scheduled time deterministically from the cron engine. Since we are only able
|
|
// to generate the latter function after the job is scheduled, there is a tiny chance that the job is run before
|
|
// the deterministic function is supplanted. If that happens, we use the infer function as the next-best thing
|
|
scheduledTimeFunc: inferScheduledTime,
|
|
}
|
|
}
|
|
|
|
// Run handles the running of a cron workflow
|
|
// It fits the github.com/robfig/cron.Job interface
|
|
func (woc *cronWfOperationCtx) Run() {
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, woc.log)
|
|
woc.run(ctx, woc.scheduledTimeFunc())
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Time) {
|
|
defer woc.persistUpdate(ctx)
|
|
|
|
woc.log.Infof(ctx, "Running %s", woc.name)
|
|
|
|
// If the cron workflow has a schedule that was just updated, update its annotation
|
|
if woc.cronWf.IsUsingNewSchedule() {
|
|
woc.cronWf.SetSchedule(woc.cronWf.Spec.GetScheduleWithTimezoneString())
|
|
}
|
|
|
|
err := woc.validateCronWorkflow(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
completed, err := woc.checkStopingCondition()
|
|
if err != nil {
|
|
woc.reportCronWorkflowError(ctx, v1alpha1.ConditionTypeSpecError, fmt.Sprintf("failed to check CronWorkflow '%s' stopping condition: %s", woc.cronWf.Name, err))
|
|
return
|
|
} else if completed {
|
|
woc.setAsCompleted()
|
|
}
|
|
|
|
proceed, err := woc.enforceRuntimePolicy(ctx)
|
|
if err != nil {
|
|
woc.reportCronWorkflowError(ctx, v1alpha1.ConditionTypeSubmissionError, fmt.Sprintf("run policy error: %s", err))
|
|
return
|
|
} else if !proceed {
|
|
return
|
|
}
|
|
|
|
woc.metrics.CronWfTrigger(ctx, woc.name, woc.cronWf.Namespace)
|
|
|
|
wf := common.ConvertCronWorkflowToWorkflowWithProperties(woc.cronWf, getChildWorkflowName(woc.cronWf.Name, scheduledRuntime), scheduledRuntime)
|
|
|
|
runWf, err := util.SubmitWorkflow(ctx, woc.wfClient, woc.wfClientset, woc.cronWf.Namespace, wf, woc.wfDefaults, &v1alpha1.SubmitOpts{})
|
|
if err != nil {
|
|
// If the workflow already exists (i.e. this is a duplicate submission), do not report an error
|
|
if errors.IsAlreadyExists(err) {
|
|
return
|
|
}
|
|
woc.reportCronWorkflowError(ctx, v1alpha1.ConditionTypeSubmissionError, fmt.Sprintf("Failed to submit Workflow: %s", err))
|
|
return
|
|
}
|
|
|
|
woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(wf, runWf))
|
|
woc.cronWf.Status.Phase = v1alpha1.ActivePhase
|
|
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: scheduledRuntime}
|
|
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSubmissionError)
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) validateCronWorkflow(ctx context.Context) error {
|
|
wftmplGetter := informer.NewWorkflowTemplateFromInformerGetter(woc.wftmplInformer, woc.cronWf.Namespace)
|
|
cwftmplGetter := informer.NewClusterWorkflowTemplateFromInformerGetter(woc.cwftmplInformer)
|
|
err := validate.ValidateCronWorkflow(ctx, wftmplGetter, cwftmplGetter, woc.cronWf, woc.wfDefaults)
|
|
if err != nil {
|
|
woc.reportCronWorkflowError(ctx, v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
|
|
} else {
|
|
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow) corev1.ObjectReference {
|
|
// This is a bit of a hack. Ideally we'd use ref.GetReference, but for some reason the `runWf` object is coming back
|
|
// without `Kind` and `APIVersion` set (even though it it set on `wf`). To fix this, we hard code those values.
|
|
return corev1.ObjectReference{
|
|
Kind: wf.Kind,
|
|
APIVersion: wf.APIVersion,
|
|
Name: runWf.GetName(),
|
|
Namespace: runWf.GetNamespace(),
|
|
UID: runWf.GetUID(),
|
|
ResourceVersion: runWf.GetResourceVersion(),
|
|
}
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) persistUpdate(ctx context.Context) {
|
|
woc.patch(ctx, map[string]interface{}{"status": woc.cronWf.Status, "metadata": map[string]interface{}{"annotations": woc.cronWf.Annotations, "labels": woc.cronWf.Labels}})
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) persistCurrentWorkflowStatus(ctx context.Context) {
|
|
woc.patch(ctx, map[string]interface{}{"status": map[string]interface{}{"active": woc.cronWf.Status.Active, "succeeded": woc.cronWf.Status.Succeeded, "failed": woc.cronWf.Status.Failed, "phase": woc.cronWf.Status.Phase}})
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) patch(ctx context.Context, patch map[string]interface{}) {
|
|
data, err := json.Marshal(patch)
|
|
if err != nil {
|
|
woc.log.WithError(err).Error(ctx, "failed to marshall cron workflow status.active data")
|
|
return
|
|
}
|
|
err = waitutil.Backoff(retry.DefaultBackoff, func() (bool, error) {
|
|
cronWf, err := woc.cronWfIf.Patch(ctx, woc.cronWf.Name, types.MergePatchType, data, v1.PatchOptions{})
|
|
if err != nil {
|
|
return !errorsutil.IsTransientErr(ctx, err), err
|
|
}
|
|
woc.cronWf = cronWf
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
woc.log.WithError(err).Error(ctx, "failed to update cron workflow")
|
|
return
|
|
}
|
|
}
|
|
|
|
// TODO: refactor shouldExecute in steps.go
|
|
func shouldExecute(when string) (bool, error) {
|
|
if when == "" {
|
|
return true, nil
|
|
}
|
|
expression, err := govaluate.NewEvaluableExpression(when)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
result, err := expression.Evaluate(nil)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
boolRes, ok := result.(bool)
|
|
if !ok {
|
|
return false, argoerrs.Errorf(argoerrs.CodeBadRequest, "Expected boolean evaluation for '%s'. Got %v", when, result)
|
|
}
|
|
return boolRes, nil
|
|
}
|
|
|
|
func evalWhen(cron *v1alpha1.CronWorkflow) (bool, error) {
|
|
if cron.Spec.When == "" {
|
|
return true, nil
|
|
}
|
|
|
|
t, err := template.NewTemplate(string(cron.Spec.When))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
env := make(map[string]interface{})
|
|
addSetField := func(name string, value interface{}) {
|
|
env[fmt.Sprintf("%s.%s", variablePrefix, name)] = value
|
|
}
|
|
err = expressionEnv(cron, addSetField)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
newWhenStr, err := t.Replace(env, false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
newCron := cron.DeepCopy()
|
|
newCron.Spec.When = newWhenStr
|
|
|
|
return shouldExecute(newCron.Spec.When)
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) enforceRuntimePolicy(ctx context.Context) (bool, error) {
|
|
if woc.cronWf.Spec.Suspend {
|
|
woc.log.Infof(ctx, "%s is suspended, skipping execution", woc.name)
|
|
return false, nil
|
|
}
|
|
|
|
if woc.cronWf.Status.Phase == v1alpha1.StoppedPhase {
|
|
woc.log.Infof(ctx, "CronWorkflow %s is marked as stopped since it achieved the stopping condition", woc.cronWf.Name)
|
|
return false, nil
|
|
}
|
|
|
|
canProceed, err := evalWhen(woc.cronWf)
|
|
if err != nil || !canProceed {
|
|
return canProceed, err
|
|
}
|
|
|
|
if woc.cronWf.Spec.ConcurrencyPolicy != "" {
|
|
switch woc.cronWf.Spec.ConcurrencyPolicy {
|
|
case v1alpha1.AllowConcurrent, "":
|
|
// Do nothing
|
|
case v1alpha1.ForbidConcurrent:
|
|
if len(woc.cronWf.Status.Active) > 0 {
|
|
woc.metrics.CronWfPolicy(ctx, woc.name, woc.cronWf.Namespace, v1alpha1.ForbidConcurrent)
|
|
woc.log.Infof(ctx, "%s has 'ConcurrencyPolicy: Forbid' and has an active Workflow so it was not run", woc.name)
|
|
return false, nil
|
|
}
|
|
case v1alpha1.ReplaceConcurrent:
|
|
if len(woc.cronWf.Status.Active) > 0 {
|
|
woc.metrics.CronWfPolicy(ctx, woc.name, woc.cronWf.Namespace, v1alpha1.ReplaceConcurrent)
|
|
woc.log.Infof(ctx, "%s has 'ConcurrencyPolicy: Replace' and has active Workflows", woc.name)
|
|
err := woc.terminateOutstandingWorkflows(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
default:
|
|
return false, fmt.Errorf("invalid ConcurrencyPolicy: %s", woc.cronWf.Spec.ConcurrencyPolicy)
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) terminateOutstandingWorkflows(ctx context.Context) error {
|
|
for _, wfObjectRef := range woc.cronWf.Status.Active {
|
|
woc.log.Infof(ctx, "stopping '%s'", wfObjectRef.Name)
|
|
err := util.TerminateWorkflow(ctx, woc.wfClient, wfObjectRef.Name)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
woc.log.Warnf(ctx, "workflow %q not found when trying to terminate outstanding workflows", wfObjectRef.Name)
|
|
continue
|
|
}
|
|
alreadyShutdownErr, ok := err.(util.AlreadyShutdownError)
|
|
if ok {
|
|
woc.log.Warn(ctx, alreadyShutdownErr.Error())
|
|
continue
|
|
}
|
|
return fmt.Errorf("error stopping workflow %s: %e", wfObjectRef.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) runOutstandingWorkflows(ctx context.Context) (bool, error) {
|
|
missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !missedExecutionTime.IsZero() {
|
|
woc.run(ctx, missedExecutionTime)
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun(ctx context.Context) (time.Time, error) {
|
|
// If the CronWorkflow schedule was just updated, then do not run any outstanding workflows.
|
|
if woc.cronWf.IsUsingNewSchedule() {
|
|
return time.Time{}, nil
|
|
}
|
|
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
|
|
if woc.cronWf.Status.LastScheduledTime != nil {
|
|
for _, schedule := range woc.cronWf.Spec.GetSchedulesWithTimezone(ctx) {
|
|
var now time.Time
|
|
var cronSchedule cron.Schedule
|
|
now = time.Now()
|
|
cronSchedule, err := cron.ParseStandard(schedule)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
|
|
var missedExecutionTime time.Time
|
|
nextScheduledRunTime := cronSchedule.Next(woc.cronWf.Status.LastScheduledTime.Time)
|
|
// Workflow should have ran
|
|
for nextScheduledRunTime.Before(now) {
|
|
missedExecutionTime = nextScheduledRunTime
|
|
nextScheduledRunTime = cronSchedule.Next(missedExecutionTime)
|
|
}
|
|
|
|
// We missed the latest execution time
|
|
if !missedExecutionTime.IsZero() {
|
|
// if missedExecutionTime is within StartDeadlineSeconds, We are still within the deadline window, run the Workflow
|
|
if woc.cronWf.Spec.StartingDeadlineSeconds != nil && now.Before(missedExecutionTime.Add(time.Duration(*woc.cronWf.Spec.StartingDeadlineSeconds)*time.Second)) {
|
|
woc.log.Infof(ctx, "%s missed an execution at %s and is within StartingDeadline", woc.cronWf.Name, missedExecutionTime.Format("Mon Jan _2 15:04:05 2006"))
|
|
return missedExecutionTime, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return time.Time{}, nil
|
|
}
|
|
|
|
type fulfilledWfsPhase struct {
|
|
fulfilled bool
|
|
phase v1alpha1.WorkflowPhase
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) reconcileActiveWfs(ctx context.Context, workflows []v1alpha1.Workflow) error {
|
|
updated := false
|
|
currentWfsFulfilled := make(map[types.UID]fulfilledWfsPhase, len(workflows))
|
|
for _, wf := range workflows {
|
|
currentWfsFulfilled[wf.UID] = fulfilledWfsPhase{
|
|
fulfilled: wf.Status.Fulfilled(),
|
|
phase: wf.Status.Phase,
|
|
}
|
|
if !woc.cronWf.Status.HasActiveUID(wf.UID) && !wf.Status.Fulfilled() {
|
|
updated = true
|
|
woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(&wf, &wf))
|
|
}
|
|
}
|
|
|
|
for _, objectRef := range woc.cronWf.Status.Active {
|
|
if fulfilled, found := currentWfsFulfilled[objectRef.UID]; !found || fulfilled.fulfilled {
|
|
updated = true
|
|
woc.removeFromActiveList(objectRef.UID)
|
|
if found && fulfilled.fulfilled {
|
|
woc.updateWfPhaseCounter(fulfilled.phase)
|
|
completed, err := woc.checkStopingCondition()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check CronWorkflow '%s' stopping condition: %s", woc.cronWf.Name, err)
|
|
} else if completed {
|
|
woc.setAsCompleted()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if updated {
|
|
woc.persistCurrentWorkflowStatus(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) removeFromActiveList(uid types.UID) {
|
|
var newActive []corev1.ObjectReference
|
|
for _, ref := range woc.cronWf.Status.Active {
|
|
if ref.UID != uid {
|
|
newActive = append(newActive, ref)
|
|
}
|
|
}
|
|
woc.cronWf.Status.Active = newActive
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) enforceHistoryLimit(ctx context.Context, workflows []v1alpha1.Workflow) error {
|
|
woc.log.Debugf(ctx, "Enforcing history limit for '%s'", woc.cronWf.Name)
|
|
|
|
var successfulWorkflows []v1alpha1.Workflow
|
|
var failedWorkflows []v1alpha1.Workflow
|
|
for _, wf := range workflows {
|
|
if wf.Labels[common.LabelKeyCronWorkflow] != woc.cronWf.Name {
|
|
continue
|
|
}
|
|
if wf.Status.Fulfilled() {
|
|
if wf.Status.Successful() {
|
|
successfulWorkflows = append(successfulWorkflows, wf)
|
|
} else {
|
|
failedWorkflows = append(failedWorkflows, wf)
|
|
}
|
|
}
|
|
}
|
|
|
|
workflowsToKeep := int32(3)
|
|
if woc.cronWf.Spec.SuccessfulJobsHistoryLimit != nil && *woc.cronWf.Spec.SuccessfulJobsHistoryLimit >= 0 {
|
|
workflowsToKeep = *woc.cronWf.Spec.SuccessfulJobsHistoryLimit
|
|
}
|
|
err := woc.deleteOldestWorkflows(ctx, successfulWorkflows, int(workflowsToKeep))
|
|
if err != nil {
|
|
return fmt.Errorf("unable to delete Successful Workflows of CronWorkflow '%s': %s", woc.cronWf.Name, err)
|
|
}
|
|
|
|
workflowsToKeep = int32(1)
|
|
if woc.cronWf.Spec.FailedJobsHistoryLimit != nil && *woc.cronWf.Spec.FailedJobsHistoryLimit >= 0 {
|
|
workflowsToKeep = *woc.cronWf.Spec.FailedJobsHistoryLimit
|
|
}
|
|
err = woc.deleteOldestWorkflows(ctx, failedWorkflows, int(workflowsToKeep))
|
|
if err != nil {
|
|
return fmt.Errorf("unable to delete Failed Workflows of CronWorkflow '%s': %s", woc.cronWf.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) deleteOldestWorkflows(ctx context.Context, jobList []v1alpha1.Workflow, workflowsToKeep int) error {
|
|
if workflowsToKeep >= len(jobList) {
|
|
return nil
|
|
}
|
|
|
|
sort.SliceStable(jobList, func(i, j int) bool {
|
|
return jobList[i].Status.FinishedAt.After(jobList[j].Status.FinishedAt.Time)
|
|
})
|
|
|
|
for _, wf := range jobList[workflowsToKeep:] {
|
|
err := woc.wfClient.Delete(ctx, wf.Name, v1.DeleteOptions{})
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
woc.log.Infof(ctx, "Workflow '%s' was already deleted", wf.Name)
|
|
continue
|
|
}
|
|
return fmt.Errorf("error deleting workflow '%s': %e", wf.Name, err)
|
|
}
|
|
woc.log.Infof(ctx, "Deleted Workflow '%s' due to CronWorkflow '%s' history limit", wf.Name, woc.cronWf.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) reportCronWorkflowError(ctx context.Context, conditionType v1alpha1.ConditionType, errString string) {
|
|
woc.log.WithField("conditionType", conditionType).Error(ctx, errString)
|
|
woc.cronWf.Status.Conditions.UpsertCondition(v1alpha1.Condition{
|
|
Type: conditionType,
|
|
Message: errString,
|
|
Status: v1.ConditionTrue,
|
|
})
|
|
if conditionType == v1alpha1.ConditionTypeSpecError {
|
|
woc.metrics.CronWorkflowSpecError(ctx)
|
|
} else {
|
|
if conditionType == v1alpha1.ConditionTypeSubmissionError {
|
|
woc.cronWf.Status.Failed++
|
|
}
|
|
woc.metrics.CronWorkflowSubmissionError(ctx)
|
|
}
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) updateWfPhaseCounter(phase v1alpha1.WorkflowPhase) {
|
|
switch phase {
|
|
case v1alpha1.WorkflowError, v1alpha1.WorkflowFailed:
|
|
woc.cronWf.Status.Failed++
|
|
case v1alpha1.WorkflowSucceeded:
|
|
woc.cronWf.Status.Succeeded++
|
|
}
|
|
}
|
|
|
|
func expressionEnv(cron *v1alpha1.CronWorkflow, addSetField func(name string, value interface{})) error {
|
|
addSetField("name", cron.Name)
|
|
addSetField("namespace", cron.Namespace)
|
|
addSetField("labels", cron.Labels)
|
|
addSetField("annotations", cron.Labels)
|
|
addSetField("failed", cron.Status.Failed)
|
|
addSetField("succeeded", cron.Status.Succeeded)
|
|
|
|
labelsStr, err := json.Marshal(&cron.Labels)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
annotationsStr, err := json.Marshal(&cron.Annotations)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addSetField("annotations.json", annotationsStr)
|
|
addSetField("labels.json", labelsStr)
|
|
|
|
var tm *time.Time
|
|
tm = nil
|
|
|
|
if cron.Status.LastScheduledTime != nil {
|
|
tm = &cron.Status.LastScheduledTime.Time
|
|
}
|
|
|
|
addSetField("lastScheduledTime", tm)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) checkStopingCondition() (bool, error) {
|
|
if woc.cronWf.Spec.StopStrategy == nil {
|
|
return false, nil
|
|
}
|
|
prefixedEnv := make(map[string]interface{})
|
|
addSetField := func(name string, value interface{}) {
|
|
prefixedEnv[name] = value
|
|
}
|
|
env := make(map[string]interface{})
|
|
env[variablePrefix] = prefixedEnv
|
|
err := expressionEnv(woc.cronWf, addSetField)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
suspend, err := argoexpr.EvalBool(woc.cronWf.Spec.StopStrategy.Expression, env)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to evaluate stop expression: %w", err)
|
|
}
|
|
return suspend, nil
|
|
}
|
|
|
|
func (woc *cronWfOperationCtx) setAsCompleted() {
|
|
woc.cronWf.Status.Phase = v1alpha1.StoppedPhase
|
|
if woc.cronWf.Labels == nil {
|
|
woc.cronWf.Labels = map[string]string{}
|
|
}
|
|
woc.cronWf.Labels[common.LabelKeyCronWorkflowCompleted] = "true"
|
|
}
|
|
|
|
func inferScheduledTime() time.Time {
|
|
// Infer scheduled runtime by getting current time and zeroing out current seconds and nanoseconds
|
|
// This works because the finest possible scheduled runtime is a minute. It is unlikely to ever be used, since this
|
|
// function is quickly supplanted by a deterministic function from the cron engine.
|
|
now := time.Now().UTC()
|
|
scheduledTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
|
|
|
|
log.Infof("inferred scheduled time: %s", scheduledTime)
|
|
return scheduledTime
|
|
}
|
|
|
|
func getChildWorkflowName(cronWorkflowName string, scheduledRuntime time.Time) string {
|
|
return fmt.Sprintf("%s-%d", cronWorkflowName, scheduledRuntime.Unix())
|
|
}
|