diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index 1e10ef5b0..e6e7477b8 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -187,23 +187,84 @@ func (s *Scheduler) addCluster(obj interface{}) { return } klog.V(3).Infof("Add event for cluster %s", cluster.Name) - if s.enableSchedulerEstimator { s.schedulerEstimatorWorker.Add(cluster.Name) } } -func (s *Scheduler) updateCluster(_, newObj interface{}) { +func (s *Scheduler) updateCluster(oldObj, newObj interface{}) { newCluster, ok := newObj.(*clusterv1alpha1.Cluster) if !ok { klog.Errorf("cannot convert newObj to Cluster: %v", newObj) return } + oldCluster, ok := oldObj.(*clusterv1alpha1.Cluster) + if !ok { + klog.Errorf("cannot convert oldObj to Cluster: %v", newObj) + return + } klog.V(3).Infof("Update event for cluster %s", newCluster.Name) if s.enableSchedulerEstimator { s.schedulerEstimatorWorker.Add(newCluster.Name) } + + switch { + case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels): + fallthrough + case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec): + s.enqueueAffectedPolicy(newCluster) + s.enqueueAffectedClusterPolicy(newCluster) + } +} + +// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs +func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) { + policies, _ := s.policyLister.List(labels.Everything()) + for _, policy := range policies { + selector := labels.SelectorFromSet(labels.Set{ + policyv1alpha1.PropagationPolicyNamespaceLabel: policy.Namespace, + policyv1alpha1.PropagationPolicyNameLabel: policy.Name, + }) + affinity := policy.Spec.Placement.ClusterAffinity + switch { + case affinity == nil: + // If no clusters specified, add it in queue + fallthrough + case util.ClusterMatches(newCluster, *affinity): + // If specific cluster matches the affinity. add it in queue + err := s.requeueResourceBindings(selector) + if err != nil { + klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) + } + } + } +} + +// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs +func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) { + clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything()) + for _, policy := range clusterPolicies { + selector := labels.SelectorFromSet(labels.Set{ + policyv1alpha1.ClusterPropagationPolicyLabel: policy.Name, + }) + affinity := policy.Spec.Placement.ClusterAffinity + switch { + case affinity == nil: + // If no clusters specified, add it in queue + fallthrough + case util.ClusterMatches(newCluster, *affinity): + // If specific cluster matches the affinity. add it in queue + err := s.requeueClusterResourceBindings(selector) + if err != nil { + klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err) + } + err = s.requeueResourceBindings(selector) + if err != nil { + klog.Errorf("Failed to requeue ResourceBinding, error: %v", err) + } + } + } } func (s *Scheduler) deleteCluster(obj interface{}) { diff --git a/test/e2e/rescheduling_test.go b/test/e2e/rescheduling_test.go index 18f0031cb..e6505adec 100644 --- a/test/e2e/rescheduling_test.go +++ b/test/e2e/rescheduling_test.go @@ -25,7 +25,7 @@ import ( ) // reschedule testing is used to test the rescheduling situation when some initially scheduled clusters are unjoined -var _ = ginkgo.Describe("reschedule testing", func() { +var _ = ginkgo.Describe("[cluster unjoined] reschedule testing", func() { framework.SerialContext("Deployment propagation testing", ginkgo.Label(NeedCreateCluster), func() { var newClusterName string var homeDir string @@ -145,3 +145,182 @@ var _ = ginkgo.Describe("reschedule testing", func() { }) }) }) + +// reschedule testing is used to test the rescheduling situation when some clusters are joined and recovered +var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() { + framework.SerialContext("Deployment propagation testing", ginkgo.Label(NeedCreateCluster), func() { + var newClusterName string + var homeDir string + var kubeConfigPath string + var controlPlane string + var clusterContext string + var initClusterNames []string + + ginkgo.BeforeEach(func() { + newClusterName = "member-e2e-" + rand.String(3) + homeDir = os.Getenv("HOME") + kubeConfigPath = fmt.Sprintf("%s/.kube/%s.config", homeDir, newClusterName) + controlPlane = fmt.Sprintf("%s-control-plane", newClusterName) + clusterContext = fmt.Sprintf("kind-%s", newClusterName) + }) + + ginkgo.BeforeEach(func() { + ginkgo.By(fmt.Sprintf("Creating cluster: %s", newClusterName), func() { + err := createCluster(newClusterName, kubeConfigPath, controlPlane, clusterContext) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + }) + + ginkgo.AfterEach(func() { + ginkgo.By(fmt.Sprintf("Unjoin clsters: %s", newClusterName), func() { + karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions()) + opts := karmadactl.CommandUnjoinOption{ + GlobalCommandOptions: options.GlobalCommandOptions{ + KubeConfig: fmt.Sprintf("%s/.kube/karmada.config", os.Getenv("HOME")), + KarmadaContext: "karmada-apiserver", + }, + ClusterNamespace: "karmada-cluster", + ClusterName: newClusterName, + } + err := karmadactl.RunUnjoin(karmadaConfig, opts) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) + ginkgo.By(fmt.Sprintf("Deleting clusters: %s", newClusterName), func() { + err := deleteCluster(newClusterName, kubeConfigPath) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + _ = os.Remove(kubeConfigPath) + }) + }) + + var policyNamespace, policyName string + var deploymentNamespace, deploymentName string + var deployment *appsv1.Deployment + var policy *policyv1alpha1.PropagationPolicy + ginkgo.Context("testing the ReplicaSchedulingType of the policy is Duplicated", func() { + ginkgo.BeforeEach(func() { + policyNamespace = testNamespace + policyName = deploymentNamePrefix + rand.String(RandomStrLength) + deploymentNamespace = testNamespace + deploymentName = policyName + deployment = testhelper.NewDeployment(deploymentNamespace, deploymentName) + deployment.Spec.Replicas = pointer.Int32Ptr(1) + // set ReplicaSchedulingType=Duplicated. + policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ + { + APIVersion: deployment.APIVersion, + Kind: deployment.Kind, + Name: deployment.Name, + }, + }, policyv1alpha1.Placement{ + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDuplicated, + }, + }) + }) + + ginkgo.It("when the ReplicaSchedulingType of the policy is Duplicated, reschedule testing", func() { + ginkgo.By("create deployment and policy") + framework.CreatePropagationPolicy(karmadaClient, policy) + framework.CreateDeployment(kubeClient, deployment) + ginkgo.DeferCleanup(func() { + framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name) + framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) + }) + + ginkgo.By(fmt.Sprintf("Joinning cluster: %s", newClusterName)) + karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions()) + opts := karmadactl.CommandJoinOption{ + DryRun: false, + ClusterNamespace: "karmada-cluster", + ClusterName: newClusterName, + ClusterContext: clusterContext, + ClusterKubeConfig: kubeConfigPath, + } + err := karmadactl.RunJoin(karmadaConfig, opts) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + // wait for the current cluster status changing to true + framework.WaitClusterFitWith(controlPlaneClient, newClusterName, func(cluster *clusterv1alpha1.Cluster) bool { + return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + }) + + ginkgo.By("check whether the deployment is rescheduled to a new cluster") + gomega.Eventually(func(g gomega.Gomega) bool { + targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment) + return !testhelper.IsExclude(newClusterName, targetClusterNames) + }, pollTimeout, pollInterval).Should(gomega.BeTrue()) + }) + }) + + ginkgo.Context("testing clusterAffinity of the policy", func() { + ginkgo.BeforeEach(func() { + initClusterNames = []string{"member1", "member2", newClusterName} + policyNamespace = testNamespace + policyName = deploymentNamePrefix + rand.String(RandomStrLength) + deploymentNamespace = testNamespace + deploymentName = policyName + deployment = testhelper.NewDeployment(deploymentNamespace, deploymentName) + deployment.Spec.Replicas = pointer.Int32Ptr(1) + // set clusterAffinity for Placement. + policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{ + { + APIVersion: deployment.APIVersion, + Kind: deployment.Kind, + Name: deployment.Name, + }, + }, policyv1alpha1.Placement{ + ClusterAffinity: &policyv1alpha1.ClusterAffinity{ClusterNames: initClusterNames}, + }) + }) + ginkgo.It("when the ReplicaScheduling of the policy is nil, reschedule testing", func() { + ginkgo.By("create deployment and policy") + + framework.CreatePropagationPolicy(karmadaClient, policy) + + framework.CreateDeployment(kubeClient, deployment) + ginkgo.DeferCleanup(func() { + framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name) + framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name) + }) + gomega.Eventually(func(g gomega.Gomega) bool { + targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment) + return testhelper.IsExclude(newClusterName, targetClusterNames) + }, pollTimeout, pollInterval).Should(gomega.BeTrue()) + + ginkgo.By(fmt.Sprintf("Joinning cluster: %s", newClusterName)) + karmadaConfig := karmadactl.NewKarmadaConfig(clientcmd.NewDefaultPathOptions()) + opts := karmadactl.CommandJoinOption{ + DryRun: false, + ClusterNamespace: "karmada-cluster", + ClusterName: newClusterName, + ClusterContext: clusterContext, + ClusterKubeConfig: kubeConfigPath, + } + err := karmadactl.RunJoin(karmadaConfig, opts) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + // wait for the current cluster status changing to true + framework.WaitClusterFitWith(controlPlaneClient, newClusterName, func(cluster *clusterv1alpha1.Cluster) bool { + return meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + }) + + ginkgo.By("check whether the deployment is rescheduled to a new cluster") + gomega.Eventually(func(g gomega.Gomega) bool { + targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment) + for _, clusterName := range initClusterNames { + if testhelper.IsExclude(clusterName, targetClusterNames) { + return false + } + } + return true + }, pollTimeout, pollInterval).Should(gomega.BeTrue()) + + gomega.Eventually(func(g gomega.Gomega) bool { + targetClusterNames := framework.ExtractTargetClustersFrom(controlPlaneClient, deployment) + return testhelper.IsExclude("member3", targetClusterNames) + }, pollTimeout, pollInterval).Should(gomega.BeTrue()) + + }) + }) + }) +})