karmada/pkg/scheduler/scheduler.go

722 lines
28 KiB
Go

package scheduler
import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
jsonpatch "github.com/evanphx/json-patch/v5"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
"github.com/karmada-io/karmada/pkg/events"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
policylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics"
)
// ScheduleType defines the schedule type of a binding object should be performed.
type ScheduleType string
const (
// ReconcileSchedule means the binding object associated policy has been changed.
ReconcileSchedule ScheduleType = "ReconcileSchedule"
// ScaleSchedule means the replicas of binding object has been changed.
ScaleSchedule ScheduleType = "ScaleSchedule"
)
const (
scheduleSuccessReason = "BindingScheduled"
scheduleFailedReason = "BindingFailedScheduling"
scheduleSuccessMessage = "Binding has been scheduled"
)
// Scheduler is the scheduler schema, which is used to schedule a specific resource to specific clusters
type Scheduler struct {
DynamicClient dynamic.Interface
KarmadaClient karmadaclientset.Interface
KubeClient kubernetes.Interface
bindingLister worklister.ResourceBindingLister
policyLister policylister.PropagationPolicyLister
clusterBindingLister worklister.ClusterResourceBindingLister
clusterPolicyLister policylister.ClusterPropagationPolicyLister
clusterLister clusterlister.ClusterLister
informerFactory informerfactory.SharedInformerFactory
// TODO: implement a priority scheduling queue
queue workqueue.RateLimitingInterface
Algorithm core.ScheduleAlgorithm
schedulerCache schedulercache.Cache
eventRecorder record.EventRecorder
enableSchedulerEstimator bool
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServicePrefix string
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
enableEmptyWorkloadPropagation bool
}
type schedulerOptions struct {
// enableSchedulerEstimator represents whether the accurate scheduler estimator should be enabled.
enableSchedulerEstimator bool
// disableSchedulerEstimatorInPullMode represents whether to disable the scheduler estimator in pull mode.
disableSchedulerEstimatorInPullMode bool
// schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
schedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
schedulerEstimatorServicePrefix string
// schedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
schedulerEstimatorPort int
//enableEmptyWorkloadPropagation represents whether allow workload with replicas 0 propagated to member clusters should be enabled
enableEmptyWorkloadPropagation bool
// outOfTreeRegistry represents the registry of out-of-tree plugins
outOfTreeRegistry runtime.Registry
// plugins is the list of plugins to enable or disable
plugins []string
}
// Option configures a Scheduler
type Option func(*schedulerOptions)
// WithEnableSchedulerEstimator sets the enableSchedulerEstimator for scheduler
func WithEnableSchedulerEstimator(enableSchedulerEstimator bool) Option {
return func(o *schedulerOptions) {
o.enableSchedulerEstimator = enableSchedulerEstimator
}
}
// WithDisableSchedulerEstimatorInPullMode sets the disableSchedulerEstimatorInPullMode for scheduler
func WithDisableSchedulerEstimatorInPullMode(disableSchedulerEstimatorInPullMode bool) Option {
return func(o *schedulerOptions) {
o.disableSchedulerEstimatorInPullMode = disableSchedulerEstimatorInPullMode
}
}
// WithSchedulerEstimatorTimeout sets the schedulerEstimatorTimeout for scheduler
func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorTimeout = schedulerEstimatorTimeout
}
}
// WithSchedulerEstimatorServicePrefix sets the schedulerEstimatorServicePrefix for scheduler
func WithSchedulerEstimatorServicePrefix(schedulerEstimatorServicePrefix string) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorServicePrefix = schedulerEstimatorServicePrefix
}
}
// WithSchedulerEstimatorPort sets the schedulerEstimatorPort for scheduler
func WithSchedulerEstimatorPort(schedulerEstimatorPort int) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorPort = schedulerEstimatorPort
}
}
// WithEnableEmptyWorkloadPropagation sets the enablePropagateEmptyWorkLoad for scheduler
func WithEnableEmptyWorkloadPropagation(enableEmptyWorkloadPropagation bool) Option {
return func(o *schedulerOptions) {
o.enableEmptyWorkloadPropagation = enableEmptyWorkloadPropagation
}
}
// WithEnableSchedulerPlugin sets the scheduler-plugin for scheduler
func WithEnableSchedulerPlugin(plugins []string) Option {
return func(o *schedulerOptions) {
o.plugins = plugins
}
}
// WithOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default in-tree registry.
func WithOutOfTreeRegistry(registry runtime.Registry) Option {
return func(o *schedulerOptions) {
o.outOfTreeRegistry = registry
}
}
// NewScheduler instantiates a scheduler
func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface, opts ...Option) (*Scheduler, error) {
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
bindingLister := factory.Work().V1alpha2().ResourceBindings().Lister()
policyLister := factory.Policy().V1alpha1().PropagationPolicies().Lister()
clusterBindingLister := factory.Work().V1alpha2().ClusterResourceBindings().Lister()
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "scheduler-queue")
schedulerCache := schedulercache.NewCache(clusterLister)
options := schedulerOptions{}
for _, opt := range opts {
opt(&options)
}
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.outOfTreeRegistry); err != nil {
return nil, err
}
registry = registry.Filter(options.plugins)
algorithm, err := core.NewGenericScheduler(schedulerCache, registry)
if err != nil {
return nil, err
}
sched := &Scheduler{
DynamicClient: dynamicClient,
KarmadaClient: karmadaClient,
KubeClient: kubeClient,
bindingLister: bindingLister,
policyLister: policyLister,
clusterBindingLister: clusterBindingLister,
clusterPolicyLister: clusterPolicyLister,
clusterLister: clusterLister,
informerFactory: factory,
queue: queue,
Algorithm: algorithm,
schedulerCache: schedulerCache,
}
if options.enableSchedulerEstimator {
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode
sched.schedulerEstimatorServicePrefix = options.schedulerEstimatorServicePrefix
sched.schedulerEstimatorPort = options.schedulerEstimatorPort
sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache()
schedulerEstimatorWorkerOptions := util.Options{
Name: "scheduler-estimator",
KeyFunc: nil,
ReconcileFunc: sched.reconcileEstimatorConnection,
}
sched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions)
schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, options.schedulerEstimatorTimeout.Duration)
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
}
sched.enableEmptyWorkloadPropagation = options.enableEmptyWorkloadPropagation
sched.addAllEventHandlers()
return sched, nil
}
// Run runs the scheduler
func (s *Scheduler) Run(ctx context.Context) {
stopCh := ctx.Done()
klog.Infof("Starting karmada scheduler")
defer klog.Infof("Shutting down karmada scheduler")
// Establish all connections first and then begin scheduling.
if s.enableSchedulerEstimator {
s.establishEstimatorConnections()
s.schedulerEstimatorWorker.Run(1, stopCh)
}
s.informerFactory.Start(stopCh)
s.informerFactory.WaitForCacheSync(stopCh)
go wait.Until(s.worker, time.Second, stopCh)
<-stopCh
}
func (s *Scheduler) worker() {
for s.scheduleNext() {
}
}
func (s *Scheduler) getPlacement(resourceBinding *workv1alpha2.ResourceBinding) (policyv1alpha1.Placement, string, error) {
var placement policyv1alpha1.Placement
var clusterPolicyName string
var policyName string
var policyNamespace string
var err error
if clusterPolicyName = util.GetLabelValue(resourceBinding.Labels, policyv1alpha1.ClusterPropagationPolicyLabel); clusterPolicyName != "" {
var clusterPolicy *policyv1alpha1.ClusterPropagationPolicy
clusterPolicy, err = s.clusterPolicyLister.Get(clusterPolicyName)
if err != nil {
return placement, "", err
}
placement = clusterPolicy.Spec.Placement
}
if policyName = util.GetLabelValue(resourceBinding.Labels, policyv1alpha1.PropagationPolicyNameLabel); policyName != "" {
policyNamespace = util.GetLabelValue(resourceBinding.Labels, policyv1alpha1.PropagationPolicyNamespaceLabel)
var policy *policyv1alpha1.PropagationPolicy
policy, err = s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
if err != nil {
return placement, "", err
}
placement = policy.Spec.Placement
}
var placementBytes []byte
placementBytes, err = json.Marshal(placement)
if err != nil {
return placement, "", err
}
defer func() {
if err != nil {
if clusterPolicyName != "" {
klog.Errorf("Failed to get placement of clusterPropagationPolicy %s, error: %v", clusterPolicyName, err)
} else {
klog.Errorf("Failed to get placement of propagationPolicy %s/%s, error: %v", policyNamespace, policyName, err)
}
}
}()
return placement, string(placementBytes), nil
}
func (s *Scheduler) getClusterPlacement(crb *workv1alpha2.ClusterResourceBinding) (policyv1alpha1.Placement, string, error) {
var placement policyv1alpha1.Placement
policyName := util.GetLabelValue(crb.Labels, policyv1alpha1.ClusterPropagationPolicyLabel)
policy, err := s.clusterPolicyLister.Get(policyName)
if err != nil {
return placement, "", err
}
placement = policy.Spec.Placement
placementBytes, err := json.Marshal(placement)
if err != nil {
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err)
return placement, "", err
}
return placement, string(placementBytes), nil
}
func (s *Scheduler) scheduleNext() bool {
key, shutdown := s.queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
defer s.queue.Done(key)
err := s.doSchedule(key.(string))
s.handleErr(err, key)
return true
}
func (s *Scheduler) doSchedule(key string) error {
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if len(ns) > 0 {
return s.doScheduleBinding(ns, name)
}
return s.doScheduleClusterBinding(name)
}
func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
rb, err := s.bindingLister.ResourceBindings(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
// the binding does not exist, do nothing
return nil
}
return err
}
// Update "Scheduled" condition according to schedule result.
defer func() {
s.recordScheduleResultEventForResourceBinding(rb, err)
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.patchBindingScheduleStatus(rb, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
}
}
}()
start := time.Now()
policyPlacement, policyPlacementStr, err := s.getPlacement(rb)
if err != nil {
return err
}
if appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement {
// policy placement changed, need schedule
klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&rb.Spec, policyPlacement.ReplicaScheduling) {
// binding replicas changed, need reschedule
klog.Infof("Reschedule ResourceBinding(%s/%s) as replicas scaled down or scaled up", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if rb.Spec.Replicas == 0 ||
policyPlacement.ReplicaScheduling == nil ||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
// even if scheduling type is divided.
klog.V(3).Infof("Start to schedule ResourceBinding(%s/%s) as scheduling type is duplicated", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
// TODO(dddddai): reschedule bindings on cluster change
klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
return nil
}
func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
crb, err := s.clusterBindingLister.Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
// the binding does not exist, do nothing
return nil
}
return err
}
// Update "Scheduled" condition according to schedule result.
defer func() {
s.recordScheduleResultEventForClusterResourceBinding(crb, err)
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.patchClusterBindingScheduleStatus(crb, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
}
}
}()
start := time.Now()
policyPlacement, policyPlacementStr, err := s.getClusterPlacement(crb)
if err != nil {
return err
}
if appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation); policyPlacementStr != appliedPlacement {
// policy placement changed, need schedule
klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&crb.Spec, policyPlacement.ReplicaScheduling) {
// binding replicas changed, need reschedule
klog.Infof("Reschedule ClusterResourceBinding(%s) as replicas scaled down or scaled up", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if crb.Spec.Replicas == 0 ||
policyPlacement.ReplicaScheduling == nil ||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
// even if scheduling type is divided.
klog.V(3).Infof("Start to schedule ClusterResourceBinding(%s) as scheduling type is duplicated", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
// TODO(dddddai): reschedule bindings on cluster change
klog.Infof("Don't need to schedule ClusterResourceBinding(%s)", name)
return nil
}
func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) (err error) {
klog.V(4).InfoS("Begin scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
defer klog.V(4).InfoS("End scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
placement, placementStr, err := s.getPlacement(resourceBinding)
if err != nil {
return err
}
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err != nil {
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
}
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
}
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
newBinding := oldBinding.DeepCopy()
if newBinding.Annotations == nil {
newBinding.Annotations = make(map[string]string)
}
newBinding.Annotations[util.PolicyPlacementAnnotation] = placement
newBinding.Spec.Clusters = scheduleResult
oldData, err := json.Marshal(oldBinding)
if err != nil {
return fmt.Errorf("failed to marshal the existing resource binding(%s/%s): %v", oldBinding.Namespace, oldBinding.Name, err)
}
newData, err := json.Marshal(newBinding)
if err != nil {
return fmt.Errorf("failed to marshal the new resource binding(%s/%s): %v", newBinding.Namespace, newBinding.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(newBinding.Namespace).Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
}
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
klog.V(4).InfoS("Begin scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
defer klog.V(4).InfoS("End scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
clusterPolicyName := util.GetLabelValue(clusterResourceBinding.Labels, policyv1alpha1.ClusterPropagationPolicyLabel)
policy, err := s.clusterPolicyLister.Get(clusterPolicyName)
if err != nil {
return err
}
placement, err := json.Marshal(policy.Spec.Placement)
if err != nil {
klog.Errorf("Failed to marshal placement of clusterPropagationPolicy %s, error: %v", policy.Name, err)
return err
}
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err != nil {
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
return err
}
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
}
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
newBinding := oldBinding.DeepCopy()
if newBinding.Annotations == nil {
newBinding.Annotations = make(map[string]string)
}
newBinding.Annotations[util.PolicyPlacementAnnotation] = placement
newBinding.Spec.Clusters = scheduleResult
oldData, err := json.Marshal(oldBinding)
if err != nil {
return fmt.Errorf("failed to marshal the existing cluster resource binding(%s): %v", oldBinding.Name, err)
}
newData, err := json.Marshal(newBinding)
if err != nil {
return fmt.Errorf("failed to marshal the new resource binding(%s): %v", newBinding.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
}
func (s *Scheduler) handleErr(err error, key interface{}) {
if err == nil || apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
s.queue.Forget(key)
return
}
s.queue.AddRateLimited(key)
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}
func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
name, ok := key.(string)
if !ok {
return fmt.Errorf("failed to reconcile estimator connection as invalid key: %v", key)
}
cluster, err := s.clusterLister.Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
s.schedulerEstimatorCache.DeleteCluster(name)
return nil
}
return err
}
if cluster.Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
return nil
}
return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorPort)
}
func (s *Scheduler) establishEstimatorConnections() {
clusterList, err := s.KarmadaClient.ClusterV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Cannot list all clusters when establish all cluster estimator connections: %v", err)
return
}
for i := range clusterList.Items {
if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
continue
}
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorPort); err != nil {
klog.Error(err)
}
}
}
// patchBindingScheduleStatus patches schedule status of ResourceBinding when necessary.
func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
if rb == nil {
return nil
}
modifiedObj := rb.DeepCopy()
meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition)
// Postpone setting observed generation until schedule succeed, assume scheduler will retry and
// will succeed eventually.
if newScheduledCondition.Status == metav1.ConditionTrue {
modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation
}
// Short path, ignore patch if no change.
if reflect.DeepEqual(rb.Status, modifiedObj.Status) {
return nil
}
patchBytes, err := helper.GenMergePatch(rb, modifiedObj)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
klog.V(4).Infof("Patch schedule status to ResourceBinding(%s/%s) succeed", rb.Namespace, rb.Name)
return nil
}
// patchClusterBindingScheduleStatus patches schedule status of ClusterResourceBinding when necessary
func (s *Scheduler) patchClusterBindingScheduleStatus(crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error {
if crb == nil {
return nil
}
modifiedObj := crb.DeepCopy()
meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition)
// Postpone setting observed generation until schedule succeed, assume scheduler will retry and
// will succeed eventually
if newScheduledCondition.Status == metav1.ConditionTrue {
modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation
}
// Short path, ignore patch if no change.
if reflect.DeepEqual(crb.Status, modifiedObj.Status) {
return nil
}
patchBytes, err := helper.GenMergePatch(crb, modifiedObj)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err)
return err
}
klog.V(4).Infof("Patch schedule status to ClusterResourceBinding(%s) succeed", crb.Name)
return nil
}
func (s *Scheduler) recordScheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, schedulerErr error) {
if rb == nil {
return
}
ref := &corev1.ObjectReference{
Kind: rb.Spec.Resource.Kind,
APIVersion: rb.Spec.Resource.APIVersion,
Namespace: rb.Spec.Resource.Namespace,
Name: rb.Spec.Resource.Name,
UID: rb.Spec.Resource.UID,
}
if schedulerErr == nil {
s.eventRecorder.Event(rb, corev1.EventTypeNormal, events.EventReasonScheduleBindingSucceed, scheduleSuccessMessage)
s.eventRecorder.Event(ref, corev1.EventTypeNormal, events.EventReasonScheduleBindingSucceed, scheduleSuccessMessage)
} else {
s.eventRecorder.Event(rb, corev1.EventTypeWarning, events.EventReasonScheduleBindingFailed, schedulerErr.Error())
s.eventRecorder.Event(ref, corev1.EventTypeWarning, events.EventReasonScheduleBindingFailed, schedulerErr.Error())
}
}
func (s *Scheduler) recordScheduleResultEventForClusterResourceBinding(crb *workv1alpha2.ClusterResourceBinding, schedulerErr error) {
if crb == nil {
return
}
ref := &corev1.ObjectReference{
Kind: crb.Spec.Resource.Kind,
APIVersion: crb.Spec.Resource.APIVersion,
Namespace: crb.Spec.Resource.Namespace,
Name: crb.Spec.Resource.Name,
UID: crb.Spec.Resource.UID,
}
if schedulerErr == nil {
s.eventRecorder.Event(crb, corev1.EventTypeNormal, events.EventReasonScheduleBindingSucceed, scheduleSuccessMessage)
s.eventRecorder.Event(ref, corev1.EventTypeNormal, events.EventReasonScheduleBindingSucceed, scheduleSuccessMessage)
} else {
s.eventRecorder.Event(crb, corev1.EventTypeWarning, events.EventReasonScheduleBindingFailed, schedulerErr.Error())
s.eventRecorder.Event(ref, corev1.EventTypeWarning, events.EventReasonScheduleBindingFailed, schedulerErr.Error())
}
}