Reschedule ResourceBinding when adding a cluster
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
This commit is contained in:
parent
c769381c28
commit
90ce5e004e
|
@ -187,23 +187,84 @@ func (s *Scheduler) addCluster(obj interface{}) {
|
|||
return
|
||||
}
|
||||
klog.V(3).Infof("Add event for cluster %s", cluster.Name)
|
||||
|
||||
if s.enableSchedulerEstimator {
|
||||
s.schedulerEstimatorWorker.Add(cluster.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) updateCluster(_, newObj interface{}) {
|
||||
func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
|
||||
newCluster, ok := newObj.(*clusterv1alpha1.Cluster)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to Cluster: %v", newObj)
|
||||
return
|
||||
}
|
||||
oldCluster, ok := oldObj.(*clusterv1alpha1.Cluster)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to Cluster: %v", newObj)
|
||||
return
|
||||
}
|
||||
klog.V(3).Infof("Update event for cluster %s", newCluster.Name)
|
||||
|
||||
if s.enableSchedulerEstimator {
|
||||
s.schedulerEstimatorWorker.Add(newCluster.Name)
|
||||
}
|
||||
|
||||
switch {
|
||||
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
|
||||
fallthrough
|
||||
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
|
||||
s.enqueueAffectedPolicy(newCluster)
|
||||
s.enqueueAffectedClusterPolicy(newCluster)
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
|
||||
func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
|
||||
policies, _ := s.policyLister.List(labels.Everything())
|
||||
for _, policy := range policies {
|
||||
selector := labels.SelectorFromSet(labels.Set{
|
||||
policyv1alpha1.PropagationPolicyNamespaceLabel: policy.Namespace,
|
||||
policyv1alpha1.PropagationPolicyNameLabel: policy.Name,
|
||||
})
|
||||
affinity := policy.Spec.Placement.ClusterAffinity
|
||||
switch {
|
||||
case affinity == nil:
|
||||
// If no clusters specified, add it in queue
|
||||
fallthrough
|
||||
case util.ClusterMatches(newCluster, *affinity):
|
||||
// If specific cluster matches the affinity. add it in queue
|
||||
err := s.requeueResourceBindings(selector)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
|
||||
func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) {
|
||||
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
|
||||
for _, policy := range clusterPolicies {
|
||||
selector := labels.SelectorFromSet(labels.Set{
|
||||
policyv1alpha1.ClusterPropagationPolicyLabel: policy.Name,
|
||||
})
|
||||
affinity := policy.Spec.Placement.ClusterAffinity
|
||||
switch {
|
||||
case affinity == nil:
|
||||
// If no clusters specified, add it in queue
|
||||
fallthrough
|
||||
case util.ClusterMatches(newCluster, *affinity):
|
||||
// If specific cluster matches the affinity. add it in queue
|
||||
err := s.requeueClusterResourceBindings(selector)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
||||
}
|
||||
err = s.requeueResourceBindings(selector)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) deleteCluster(obj interface{}) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
)
|
||||
|
||||
// reschedule testing is used to test the rescheduling situation when some initially scheduled clusters are unjoined
|
||||
var _ = ginkgo.Describe("reschedule testing", func() {
|
||||
var _ = ginkgo.Describe("[cluster unjoined] reschedule testing", func() {
|
||||
framework.SerialContext("Deployment propagation testing", ginkgo.Label(NeedCreateCluster), func() {
|
||||
var newClusterName string
|
||||
var homeDir string
|
||||
|
@ -145,3 +145,182 @@ var _ = ginkgo.Describe("reschedule testing", func() {
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
// reschedule testing is used to test the rescheduling situation when some clusters are joined and recovered
|
||||
var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() {
|
||||
framework.SerialContext("Deployment propagation testing", ginkgo.Label(NeedCreateCluster), func() {
|
||||
var newClusterName string
|
||||
var homeDir string
|
||||
var kubeConfigPath string
|
||||
var controlPlane string
|
||||
var clusterContext string
|
||||
var initClusterNames []string
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
newClusterName = "member-e2e-" + rand.String(3)
|
||||
homeDir = os.Getenv("HOME")
|
||||
kubeConfigPath = fmt.Sprintf("%s/.kube/%s.config", homeDir, newClusterName)
|
||||
controlPlane = fmt.Sprintf("%s-control-plane", newClusterName)
|
||||
clusterContext = fmt.Sprintf("kind-%s", newClusterName)
|
||||
})
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
ginkgo.By(fmt.Sprintf("Creating cluster: %s", newClusterName), func() {
|
||||
err := createCluster(newClusterName, kubeConfigPath, controlPlane, clusterContext)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func() {
|
||||
ginkgo.By(fmt.Sprintf("Unjoin clsters: %s", newClusterName), func() {
|
||||
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
|
||||
opts := karmadactl.CommandUnjoinOption{
|
||||
GlobalCommandOptions: options.GlobalCommandOptions{
|
||||
KubeConfig: fmt.Sprintf("%s/.kube/karmada.config", os.Getenv("HOME")),
|
||||
KarmadaContext: "karmada-apiserver",
|
||||
},
|
||||
ClusterNamespace: "karmada-cluster",
|
||||
ClusterName: newClusterName,
|
||||
}
|
||||
err := karmadactl.RunUnjoin(karmadaConfig, opts)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
})
|
||||
ginkgo.By(fmt.Sprintf("Deleting clusters: %s", newClusterName), func() {
|
||||
err := deleteCluster(newClusterName, kubeConfigPath)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
_ = os.Remove(kubeConfigPath)
|
||||
})
|
||||
})
|
||||
|
||||
var policyNamespace, policyName string
|
||||
var deploymentNamespace, deploymentName string
|
||||
var deployment *appsv1.Deployment
|
||||
var policy *policyv1alpha1.PropagationPolicy
|
||||
ginkgo.Context("testing the ReplicaSchedulingType of the policy is Duplicated", func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
policyNamespace = testNamespace
|
||||
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
|
||||
deploymentNamespace = testNamespace
|
||||
deploymentName = policyName
|
||||
deployment = testhelper.NewDeployment(deploymentNamespace, deploymentName)
|
||||
deployment.Spec.Replicas = pointer.Int32Ptr(1)
|
||||
// set ReplicaSchedulingType=Duplicated.
|
||||
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
|
||||
{
|
||||
APIVersion: deployment.APIVersion,
|
||||
Kind: deployment.Kind,
|
||||
Name: deployment.Name,
|
||||
},
|
||||
}, policyv1alpha1.Placement{
|
||||
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
|
||||
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDuplicated,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.It("when the ReplicaSchedulingType of the policy is Duplicated, reschedule testing", func() {
|
||||
ginkgo.By("create deployment and policy")
|
||||
framework.CreatePropagationPolicy(karmadaClient, policy)
|
||||
framework.CreateDeployment(kubeClient, deployment)
|
||||
ginkgo.DeferCleanup(func() {
|
||||
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
|
||||
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
|
||||
})
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Joinning cluster: %s", newClusterName))
|
||||
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
|
||||
opts := karmadactl.CommandJoinOption{
|
||||
DryRun: false,
|
||||
ClusterNamespace: "karmada-cluster",
|
||||
ClusterName: newClusterName,
|
||||
ClusterContext: clusterContext,
|
||||
ClusterKubeConfig: kubeConfigPath,
|
||||
}
|
||||
err := karmadactl.RunJoin(karmadaConfig, opts)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
|
||||
// wait for the current cluster status changing to true
|
||||
framework.WaitClusterFitWith(controlPlaneClient, newClusterName, func(cluster *clusterv1alpha1.Cluster) bool {
|
||||
return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
|
||||
})
|
||||
|
||||
ginkgo.By("check whether the deployment is rescheduled to a new cluster")
|
||||
gomega.Eventually(func(g gomega.Gomega) bool {
|
||||
targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
|
||||
return !testhelper.IsExclude(newClusterName, targetClusterNames)
|
||||
}, pollTimeout, pollInterval).Should(gomega.BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Context("testing clusterAffinity of the policy", func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
initClusterNames = []string{"member1", "member2", newClusterName}
|
||||
policyNamespace = testNamespace
|
||||
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
|
||||
deploymentNamespace = testNamespace
|
||||
deploymentName = policyName
|
||||
deployment = testhelper.NewDeployment(deploymentNamespace, deploymentName)
|
||||
deployment.Spec.Replicas = pointer.Int32Ptr(1)
|
||||
// set clusterAffinity for Placement.
|
||||
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
|
||||
{
|
||||
APIVersion: deployment.APIVersion,
|
||||
Kind: deployment.Kind,
|
||||
Name: deployment.Name,
|
||||
},
|
||||
}, policyv1alpha1.Placement{
|
||||
ClusterAffinity: &policyv1alpha1.ClusterAffinity{ClusterNames: initClusterNames},
|
||||
})
|
||||
})
|
||||
ginkgo.It("when the ReplicaScheduling of the policy is nil, reschedule testing", func() {
|
||||
ginkgo.By("create deployment and policy")
|
||||
|
||||
framework.CreatePropagationPolicy(karmadaClient, policy)
|
||||
|
||||
framework.CreateDeployment(kubeClient, deployment)
|
||||
ginkgo.DeferCleanup(func() {
|
||||
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
|
||||
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
|
||||
})
|
||||
gomega.Eventually(func(g gomega.Gomega) bool {
|
||||
targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
|
||||
return testhelper.IsExclude(newClusterName, targetClusterNames)
|
||||
}, pollTimeout, pollInterval).Should(gomega.BeTrue())
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Joinning cluster: %s", newClusterName))
|
||||
karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions())
|
||||
opts := karmadactl.CommandJoinOption{
|
||||
DryRun: false,
|
||||
ClusterNamespace: "karmada-cluster",
|
||||
ClusterName: newClusterName,
|
||||
ClusterContext: clusterContext,
|
||||
ClusterKubeConfig: kubeConfigPath,
|
||||
}
|
||||
err := karmadactl.RunJoin(karmadaConfig, opts)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
|
||||
// wait for the current cluster status changing to true
|
||||
framework.WaitClusterFitWith(controlPlaneClient, newClusterName, func(cluster *clusterv1alpha1.Cluster) bool {
|
||||
return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
|
||||
})
|
||||
|
||||
ginkgo.By("check whether the deployment is rescheduled to a new cluster")
|
||||
gomega.Eventually(func(g gomega.Gomega) bool {
|
||||
targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
|
||||
for _, clusterName := range initClusterNames {
|
||||
if testhelper.IsExclude(clusterName, targetClusterNames) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}, pollTimeout, pollInterval).Should(gomega.BeTrue())
|
||||
|
||||
gomega.Eventually(func(g gomega.Gomega) bool {
|
||||
targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment)
|
||||
return testhelper.IsExclude("member3", targetClusterNames)
|
||||
}, pollTimeout, pollInterval).Should(gomega.BeTrue())
|
||||
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue