Merge pull request #1383 from huone1/DeleteCluster

[feature]support rescheduling when deleting a cluster
This commit is contained in:
karmada-bot 2022-03-04 15:13:14 +08:00 committed by GitHub
commit b33bda9709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 141 deletions

View File

@ -34,10 +34,13 @@ func divideReplicasByResource(
spec *workv1alpha2.ResourceBindingSpec,
preference policyv1alpha1.ReplicaDivisionPreference,
) ([]workv1alpha2.TargetCluster, error) {
// Step 1: Get previous total sum of replicas.
assignedReplicas := util.GetSumOfReplicas(spec.Clusters)
// Step 1: Find the ready clusters that have old replicas
scheduledClusters := findOutScheduledCluster(spec.Clusters, clusters)
// Step 2: Check the scale type (up or down).
// Step 2: calculate the assigned Replicas in scheduledClusters
assignedReplicas := util.GetSumOfReplicas(scheduledClusters)
// Step 3: Check the scale type (up or down).
if assignedReplicas > spec.Replicas {
// We need to reduce the replicas in terms of the previous result.
newTargetClusters, err := scaleDownScheduleByReplicaDivisionPreference(spec, preference)
@ -48,7 +51,7 @@ func divideReplicasByResource(
} else if assignedReplicas < spec.Replicas {
// We need to enlarge the replicas in terms of the previous result (if exists).
// First scheduling is considered as a special kind of scaling up.
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference)
newTargetClusters, err := scaleUpScheduleByReplicaDivisionPreference(clusters, spec, preference, scheduledClusters, assignedReplicas)
if err != nil {
return nil, fmt.Errorf("failed to scaleUp: %v", err)
}
@ -202,24 +205,20 @@ func scaleUpScheduleByReplicaDivisionPreference(
clusters []*clusterv1alpha1.Cluster,
spec *workv1alpha2.ResourceBindingSpec,
preference policyv1alpha1.ReplicaDivisionPreference,
scheduledClusters []workv1alpha2.TargetCluster,
assignedReplicas int32,
) ([]workv1alpha2.TargetCluster, error) {
// Step 1: Find the clusters that have old replicas, so we can prefer to assign new replicas towards them.
scheduledClusters := findOutScheduledCluster(spec.Clusters, clusters)
// Step 2: calculate the assigned Replicas in scheduledClusters
assignedReplicas := util.GetSumOfReplicas(scheduledClusters)
// Step 3: Get how many replicas should be scheduled in this cycle and construct a new object if necessary
// Step 1: Get how many replicas should be scheduled in this cycle and construct a new object if necessary
newSpec := spec
if assignedReplicas > 0 {
newSpec = spec.DeepCopy()
newSpec.Replicas = spec.Replicas - assignedReplicas
}
// Step 4: Calculate available replicas of all candidates
// Step 2: Calculate available replicas of all candidates
clusterAvailableReplicas := calAvailableReplicas(clusters, newSpec)
// Step 5: Begin dividing.
// Step 3: Begin dividing.
// Only the new replicas are considered during this scheduler, the old replicas will not be moved.
// If not, the old replicas may be recreated which is not expected during scaling up.
// The parameter `scheduledClusterNames` is used to make sure that we assign new replicas to them preferentially
@ -230,6 +229,6 @@ func scaleUpScheduleByReplicaDivisionPreference(
return result, err
}
// Step 6: Merge the result of previous and new results.
// Step 4: Merge the result of previous and new results.
return util.MergeTargetClusters(scheduledClusters, result), nil
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
@ -21,7 +20,6 @@ import (
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
type ScheduleAlgorithm interface {
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
FailoverSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult includes the clusters selected.
@ -246,53 +244,3 @@ func (g *genericScheduler) assignReplicas(
}
return targetClusters, nil
}
func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) {
readyClusters := g.schedulerCache.Snapshot().GetReadyClusterNames()
totalClusters := util.ConvertToClusterNames(spec.Clusters)
reservedClusters := calcReservedCluster(totalClusters, readyClusters)
availableClusters := calcAvailableCluster(totalClusters, readyClusters)
candidateClusters := sets.NewString()
for clusterName := range availableClusters {
clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName)
if clusterObj == nil {
return result, fmt.Errorf("failed to get clusterObj by clusterName: %s", clusterName)
}
if result := g.scheduleFramework.RunFilterPlugins(ctx, placement, &spec.Resource, clusterObj.Cluster()); !result.IsSuccess() {
klog.V(4).Infof("cluster %q is not fit", clusterName)
} else {
candidateClusters.Insert(clusterName)
}
}
klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List())
klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List())
// TODO: should schedule as much as possible?
deltaLen := len(spec.Clusters) - len(reservedClusters)
if len(candidateClusters) < deltaLen {
// for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters
if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
klog.Warningf("ignore reschedule binding as insufficient available cluster")
return ScheduleResult{}, nil
}
}
// TODO: check if the final result meets the spread constraints.
targetClusters := reservedClusters
clusterList := candidateClusters.List()
for i := 0; i < deltaLen && i < len(candidateClusters); i++ {
targetClusters.Insert(clusterList[i])
}
var reScheduleResult []workv1alpha2.TargetCluster
for cluster := range targetClusters {
reScheduleResult = append(reScheduleResult, workv1alpha2.TargetCluster{Name: cluster})
}
return ScheduleResult{reScheduleResult}, nil
}

View File

@ -120,13 +120,3 @@ func resortClusterList(clusterAvailableReplicas []workv1alpha2.TargetCluster, sc
klog.V(4).Infof("Resorted target cluster: %v", clusterAvailableReplicas)
return clusterAvailableReplicas
}
// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'.
func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String {
return bindClusters.Difference(bindClusters.Difference(readyClusters))
}
// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'.
func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String {
return readyClusters.Difference(bindCluster)
}

View File

@ -277,8 +277,12 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
klog.Errorf("cannot convert to clusterv1alpha1.Cluster: %v", t)
return
}
klog.V(3).Infof("Delete event for cluster %s", cluster.Name)
s.enqueueAffectedBinding(cluster.Name)
s.enqueueAffectedClusterBinding(cluster.Name)
if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.Add(cluster.Name)
}

View File

@ -297,9 +297,10 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
klog.Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
return nil
}
if features.FeatureGate.Enabled(features.Failover) {
klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure", namespace, name)
err = s.rescheduleResourceBinding(rb)
klog.Infof("Reschedule ResourceBinding(%s/%s) as cluster failure or deletion", namespace, name)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
return err
}
@ -360,8 +361,8 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
return nil
}
if features.FeatureGate.Enabled(features.Failover) {
klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure", name)
err = s.rescheduleClusterResourceBinding(crb)
klog.Infof("Reschedule ClusterResourceBinding(%s) as cluster failure or deletion", name)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
return err
}
@ -447,73 +448,27 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}
func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) error {
klog.V(4).InfoS("Begin rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
defer klog.V(4).InfoS("End rescheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
policyName := util.GetLabelValue(clusterResourceBinding.Labels, policyv1alpha1.ClusterPropagationPolicyLabel)
policy, err := s.clusterPolicyLister.Get(policyName)
if err != nil {
klog.Errorf("Failed to get policy by policyName(%s): Error: %v", policyName, err)
return err
}
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
if err != nil {
return err
}
if len(reScheduleResult.SuggestedClusters) == 0 {
return nil
}
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
klog.V(4).InfoS("Begin rescheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
defer klog.V(4).InfoS("End rescheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
placement, _, err := s.getPlacement(resourceBinding)
if err != nil {
klog.Errorf("Failed to get placement by resourceBinding(%s/%s): Error: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
}
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &placement, &resourceBinding.Spec)
if err != nil {
return err
}
if len(reScheduleResult.SuggestedClusters) == 0 {
return nil
}
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
clusters := s.schedulerCache.Snapshot().GetClusters()
for i := range tcs {
isNoExisted := true
for _, c := range clusters {
if c.Cluster().Name == tcs[i].Name {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
return false
}
if c.Cluster().Name != tcs[i].Name {
continue
}
isNoExisted = false
if meta.IsStatusConditionFalse(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady) ||
!c.Cluster().DeletionTimestamp.IsZero() {
return false
}
break
}
if isNoExisted {
// don't find the target cluster in snapshot because it might have been deleted
return false
}
}
return true

View File

@ -0,0 +1,158 @@
package e2e
import (
"fmt"
"os"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/karmadactl"
"github.com/karmada-io/karmada/pkg/karmadactl/options"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/test/e2e/framework"
testhelper "github.com/karmada-io/karmada/test/helper"
)
// reschedule testing is used to test the rescheduling situation when some initially scheduled clusters are unjoined
var _ = ginkgo.Describe("reschedule testing", func() {
ginkgo.Context("Deployment propagation testing", func() {
policyNamespace := testNamespace
policyName := deploymentNamePrefix + rand.String(RandomStrLength)
deploymentNamespace := testNamespace
deploymentName := policyName
deployment := testhelper.NewDeployment(deploymentNamespace, deploymentName)
maxGroups := 1
minGroups := 1
numOfUnjoinedClusters := 1
// set MaxGroups=MinGroups=1, label is sync-mode=Push.
policy := testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
LabelSelector: &metav1.LabelSelector{
MatchLabels: pushModeClusterLabels,
},
},
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
{
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
MaxGroups: maxGroups,
MinGroups: minGroups,
},
},
})
ginkgo.It("deployment reschedule testing", func() {
framework.CreatePropagationPolicy(karmadaClient, policy)
framework.CreateDeployment(kubeClient, deployment)
var unjoinedClusters []string
targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
ginkgo.By("unjoin target cluster", func() {
count := numOfUnjoinedClusters
for _, targetClusterName := range targetClusterNames {
if count == 0 {
break
}
count--
klog.Infof("Unjoining cluster %q.", targetClusterName)
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
opts := karmadactl.CommandUnjoinOption{
GlobalCommandOptions: options.GlobalCommandOptions{
KubeConfig: fmt.Sprintf("%s/.kube/karmada.config", os.Getenv("HOME")),
KarmadaContext: "karmada-apiserver",
},
ClusterNamespace: "karmada-cluster",
ClusterName: targetClusterName,
}
err := karmadactl.RunUnjoin(os.Stdout, karmadaConfig, opts)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
unjoinedClusters = append(unjoinedClusters, targetClusterName)
}
})
ginkgo.By("check whether the deployment is rescheduled to other available clusters", func() {
gomega.Eventually(func(g gomega.Gomega) {
totalNum := 0
targetClusterNames = framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
for _, targetClusterName := range targetClusterNames {
// the target cluster should be overwritten to another available cluster
g.Expect(isUnjoined(targetClusterName, unjoinedClusters)).Should(gomega.BeFalse())
framework.WaitDeploymentPresentOnClusterFitWith(targetClusterName, deployment.Namespace, deployment.Name,
func(deployment *appsv1.Deployment) bool {
return true
})
totalNum++
}
g.Expect(totalNum == maxGroups).Should(gomega.BeTrue())
}, pollTimeout, pollInterval).Should(gomega.Succeed())
})
ginkgo.By("check if the scheduled condition is true", func() {
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
rb, err := getResourceBinding(deployment)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
ginkgo.By("rejoin the unjoined clusters", func() {
for _, unjoinedCluster := range unjoinedClusters {
fmt.Printf("cluster %q is waiting for rejoining\n", unjoinedCluster)
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
opts := karmadactl.CommandJoinOption{
GlobalCommandOptions: options.GlobalCommandOptions{
KubeConfig: fmt.Sprintf("%s/.kube/karmada.config", os.Getenv("HOME")),
KarmadaContext: "karmada-apiserver",
},
ClusterNamespace: "karmada-cluster",
ClusterName: unjoinedCluster,
ClusterContext: unjoinedCluster,
ClusterKubeConfig: fmt.Sprintf("%s/.kube/members.config", os.Getenv("HOME")),
}
err := karmadactl.RunJoin(os.Stdout, karmadaConfig, opts)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
fmt.Printf("waiting for cluster %q ready\n", unjoinedCluster)
framework.WaitClusterFitWith(controlPlaneClient, unjoinedCluster, func(cluster *clusterv1alpha1.Cluster) bool {
return util.IsClusterReady(&cluster.Status)
})
}
})
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
})
})
})
// indicate if the cluster is unjoined
func isUnjoined(clusterName string, disabledClusters []string) bool {
for _, cluster := range disabledClusters {
if cluster == clusterName {
return true
}
}
return false
}