argo-workflows/workflow/cron/controller.go

352 lines
12 KiB
Go

package cron
import (
"context"
"fmt"
"reflect"
"time"
"github.com/argoproj/pkg/sync"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
log "github.com/argoproj/argo-workflows/v3/util/logging"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
wfctx "github.com/argoproj/argo-workflows/v3/util/context"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/events"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)
// Controller is a controller for cron workflows
type Controller struct {
namespace string
managedNamespace string
instanceID string
cron *cronFacade
keyLock sync.KeyLock
wfClientset versioned.Interface
wfLister util.WorkflowLister
cronWfInformer informers.GenericInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
wfDefaults *v1alpha1.Workflow
cronWfQueue workqueue.TypedRateLimitingInterface[string]
dynamicInterface dynamic.Interface
metrics *metrics.Metrics
eventRecorderManager events.EventRecorderManager
cronWorkflowWorkers int
logger log.Logger
}
const (
cronWorkflowResyncPeriod = 20 * time.Minute
)
var cronSyncPeriod time.Duration
func init() {
slog := log.NewSlogLogger(log.GetGlobalLevel(), log.GetGlobalFormat())
ctx := log.WithLogger(context.TODO(), log.NewSlogLogger(log.GetGlobalLevel(), log.GetGlobalFormat()))
ctx = log.WithLogger(ctx, slog)
// this make sure we support timezones
_, err := time.Parse(time.RFC822, "17 Oct 07 14:03 PST")
if err != nil {
slog.WithFatal().Error(ctx, err.Error())
}
cronSyncPeriod = env.LookupEnvDurationOr(ctx, "CRON_SYNC_PERIOD", 10*time.Second)
slog.WithField("cronSyncPeriod", cronSyncPeriod).Info(ctx, "cron config")
}
// NewCronController creates a new cron controller
func NewCronController(ctx context.Context, wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceID string, metrics *metrics.Metrics,
eventRecorderManager events.EventRecorderManager, cronWorkflowWorkers int, wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer, wfDefaults *v1alpha1.Workflow,
) *Controller {
logger := log.NewSlogLogger(log.GetGlobalLevel(), log.GetGlobalFormat())
logger = logger.WithField("component", "cron")
ctx = log.WithLogger(ctx, logger)
return &Controller{
wfClientset: wfclientset,
namespace: namespace,
managedNamespace: managedNamespace,
instanceID: instanceID,
cron: newCronFacade(),
keyLock: sync.NewKeyLock(),
dynamicInterface: dynamicInterface,
cronWfQueue: metrics.RateLimiterWithBusyWorkers(ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "cron_wf_queue"),
wfDefaults: wfDefaults,
metrics: metrics,
eventRecorderManager: eventRecorderManager,
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
cronWorkflowWorkers: cronWorkflowWorkers,
logger: logger,
}
}
// Run start the cron controller
func (cc *Controller) Run(ctx context.Context) {
defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)
defer cc.cronWfQueue.ShutDown()
cc.logger.Infof(ctx, "Starting CronWorkflow controller")
if cc.instanceID != "" {
cc.logger.Infof(ctx, "...with InstanceID: %s", cc.instanceID)
}
cc.cronWfInformer = dynamicinformer.NewFilteredDynamicSharedInformerFactory(cc.dynamicInterface, cronWorkflowResyncPeriod, cc.managedNamespace, func(options *v1.ListOptions) {
cronWfInformerListOptionsFunc(options, cc.instanceID)
}).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural})
err := cc.addCronWorkflowInformerHandler(ctx)
if err != nil {
cc.logger.WithFatal().Error(ctx, err.Error())
}
wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod,
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceID) },
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceID) },
cache.Indexers{})
go wfInformer.Run(ctx.Done())
cc.wfLister = util.NewWorkflowLister(wfInformer)
cc.cron.Start()
defer cc.cron.Stop()
go cc.cronWfInformer.Informer().Run(ctx.Done())
go wait.UntilWithContext(ctx, cc.syncAll, cronSyncPeriod)
for i := 0; i < cc.cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
}
<-ctx.Done()
}
func (cc *Controller) runCronWorker() {
ctx := log.WithLogger(context.TODO(), log.NewSlogLogger(log.GetGlobalLevel(), log.GetGlobalFormat()))
ctx = log.WithLogger(ctx, log.NewSlogLogger(log.GetGlobalLevel(), log.GetGlobalFormat()))
ctx = log.WithLogger(ctx, cc.logger)
for cc.processNextCronItem(ctx) {
}
}
func (cc *Controller) processNextCronItem(ctx context.Context) bool {
defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)
key, quit := cc.cronWfQueue.Get()
if quit {
return false
}
defer cc.cronWfQueue.Done(key)
cc.keyLock.Lock(key)
defer cc.keyLock.Unlock(key)
logger := cc.logger.WithField("cronWorkflow", key)
logger.Infof(ctx, "Processing %s", key)
obj, exists, err := cc.cronWfInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
logger.WithError(err).Error(ctx, fmt.Sprintf("Failed to get CronWorkflow '%s' from informer index", key))
return true
}
if !exists {
logger.Infof(ctx, "Deleting '%s'", key)
cc.cron.Delete(key)
return true
}
un, ok := obj.(*unstructured.Unstructured)
if !ok {
logger.Errorf(ctx, "malformed cluster workflow template: expected *unstructured.Unstructured, got %s", reflect.TypeOf(obj).Name())
return true
}
cronWf := &v1alpha1.CronWorkflow{}
err = util.FromUnstructuredObj(un, cronWf)
if err != nil {
cc.eventRecorderManager.Get(un.GetNamespace()).Event(un, apiv1.EventTypeWarning, "Malformed", err.Error())
logger.WithError(err).Error(ctx, "malformed cron workflow: could not convert from unstructured")
return true
}
ctx = wfctx.InjectObjectMeta(ctx, &cronWf.ObjectMeta)
cronWorkflowOperationCtx := newCronWfOperationCtx(ctx, cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer, cc.wfDefaults)
err = cronWorkflowOperationCtx.validateCronWorkflow(ctx)
if err != nil {
logger.WithError(err).Error(ctx, "invalid cron workflow")
return true
}
wfWasRun, err := cronWorkflowOperationCtx.runOutstandingWorkflows(ctx)
if err != nil {
logger.WithError(err).Error(ctx, "could not run outstanding Workflow")
return true
} else if wfWasRun {
// A workflow was run, so the cron workflow will be requeued. Return here to avoid duplicating work
return true
}
// The job is currently scheduled, remove it and re add it.
cc.cron.Delete(key)
for _, schedule := range cronWf.Spec.GetSchedulesWithTimezone(ctx) {
lastScheduledTimeFunc, err := cc.cron.AddJob(key, schedule, cronWorkflowOperationCtx)
if err != nil {
logger.WithError(err).Error(ctx, "could not schedule CronWorkflow")
return true
}
cronWorkflowOperationCtx.scheduledTimeFunc = lastScheduledTimeFunc
}
logger.Infof(ctx, "CronWorkflow %s added", key)
return true
}
func (cc *Controller) addCronWorkflowInformerHandler(ctx context.Context) error {
_, err := cc.cronWfInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
cc.logger.Warnf(ctx, "Cron Workflow FilterFunc: '%v' is not an unstructured", obj)
return false
}
return !isCompleted(un)
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
cc.cronWfQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
cc.cronWfQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
cc.cronWfQueue.Add(key)
}
},
},
})
if err != nil {
return err
}
return nil
}
func isCompleted(wf v1.Object) bool {
completed, ok := wf.GetLabels()[common.LabelKeyCronWorkflowCompleted]
if !ok {
return false
}
return completed == "true"
}
func (cc *Controller) syncAll(ctx context.Context) {
defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)
cc.logger.Debug(ctx, "Syncing all CronWorkflows")
workflows, err := cc.wfLister.List()
if err != nil {
return
}
groupedWorkflows := groupWorkflows(workflows)
cronWorkflows := cc.cronWfInformer.Informer().GetStore().List()
for _, obj := range cronWorkflows {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
cc.logger.Error(ctx, "Unable to convert object to unstructured when syncing CronWorkflows")
continue
}
cronWf := &v1alpha1.CronWorkflow{}
err := util.FromUnstructuredObj(un, cronWf)
if err != nil {
cc.logger.WithError(err).Error(ctx, "Unable to convert unstructured to CronWorkflow when syncing CronWorkflows")
continue
}
err = cc.syncCronWorkflow(ctx, cronWf, groupedWorkflows[cronWf.UID])
if err != nil {
cc.logger.WithError(err).Error(ctx, "Unable to sync CronWorkflow")
continue
}
}
}
func (cc *Controller) syncCronWorkflow(ctx context.Context, cronWf *v1alpha1.CronWorkflow, workflows []v1alpha1.Workflow) error {
key := cronWf.Namespace + "/" + cronWf.Name
cc.keyLock.Lock(key)
defer cc.keyLock.Unlock(key)
cwoc := newCronWfOperationCtx(ctx, cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer, cc.wfDefaults)
err := cwoc.enforceHistoryLimit(ctx, workflows)
if err != nil {
return err
}
err = cwoc.reconcileActiveWfs(ctx, workflows)
if err != nil {
return err
}
return nil
}
func groupWorkflows(wfs []*v1alpha1.Workflow) map[types.UID][]v1alpha1.Workflow {
cwfChildren := make(map[types.UID][]v1alpha1.Workflow)
for _, wf := range wfs {
owner := v1.GetControllerOf(wf)
if owner == nil || owner.Kind != workflow.CronWorkflowKind {
continue
}
cwfChildren[owner.UID] = append(cwfChildren[owner.UID], *wf)
}
return cwfChildren
}
func cronWfInformerListOptionsFunc(options *v1.ListOptions, instanceID string) {
options.FieldSelector = fields.Everything().String()
labelSelector := labels.NewSelector().Add(util.InstanceIDRequirement(instanceID))
options.LabelSelector = labelSelector.String()
}
func wfInformerListOptionsFunc(options *v1.ListOptions, instanceID string) {
options.FieldSelector = fields.Everything().String()
isCronWorkflowChildReq, err := labels.NewRequirement(common.LabelKeyCronWorkflow, selection.Exists, []string{})
if err != nil {
panic(err)
}
labelSelector := labels.NewSelector().Add(*isCronWorkflowChildReq)
labelSelector = labelSelector.Add(util.InstanceIDRequirement(instanceID))
options.LabelSelector = labelSelector.String()
}