Merge pull request #823 from dddddai/sched-condition
Propose a Scheduled condition for RB/CRB
This commit is contained in:
commit
d2af1e1991
|
@ -135,6 +135,12 @@ type AggregatedStatusItem struct {
|
|||
AppliedMessage string `json:"appliedMessage,omitempty"`
|
||||
}
|
||||
|
||||
// Conditions definition
|
||||
const (
|
||||
// Scheduled represents the condition that the ResourceBinding or ClusterResourceBinding has been scheduled.
|
||||
Scheduled string = "Scheduled"
|
||||
)
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// ResourceBindingList contains a list of ResourceBinding.
|
||||
|
|
|
@ -67,7 +67,7 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle
|
|||
return c.removeFinalizer(binding)
|
||||
}
|
||||
|
||||
isReady := helper.IsBindingReady(binding.Spec.Clusters)
|
||||
isReady := helper.IsBindingReady(&binding.Status)
|
||||
if !isReady {
|
||||
klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName())
|
||||
return controllerruntime.Result{}, nil
|
||||
|
|
|
@ -67,7 +67,7 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co
|
|||
return c.removeFinalizer(clusterResourceBinding)
|
||||
}
|
||||
|
||||
isReady := helper.IsBindingReady(clusterResourceBinding.Spec.Clusters)
|
||||
isReady := helper.IsBindingReady(&clusterResourceBinding.Status)
|
||||
if !isReady {
|
||||
klog.Infof("ClusterResourceBinding %s is not ready to sync", clusterResourceBinding.GetName())
|
||||
return controllerruntime.Result{}, nil
|
||||
|
|
|
@ -61,6 +61,12 @@ const (
|
|||
Unknown ScheduleType = "Unknown"
|
||||
)
|
||||
|
||||
const (
|
||||
scheduleSuccessReason = "BindingScheduled"
|
||||
|
||||
scheduleSuccessMessage = "the binding has been scheduled"
|
||||
)
|
||||
|
||||
// Failover indicates if the scheduler should performs re-scheduler in case of cluster failure.
|
||||
// TODO(RainbowMango): Remove the temporary solution by introducing feature flag
|
||||
var Failover bool
|
||||
|
@ -472,11 +478,11 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
|
|||
}
|
||||
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateBindingStatusIfNeeded(binding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
|
||||
|
@ -501,11 +507,11 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
|||
}
|
||||
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateClusterBindingStatusIfNeeded(binding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) handleErr(err error, key interface{}) {
|
||||
|
@ -674,11 +680,11 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
|
|||
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
||||
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
|
||||
clusterResourceBinding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateClusterBindingStatusIfNeeded(clusterResourceBinding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
|
||||
|
@ -699,11 +705,11 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso
|
|||
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
||||
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
|
||||
resourceBinding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateBindingStatusIfNeeded(resourceBinding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scaleScheduleOne(key string) (err error) {
|
||||
|
@ -761,11 +767,11 @@ func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.R
|
|||
}
|
||||
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateBindingStatusIfNeeded(binding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding,
|
||||
|
@ -792,11 +798,11 @@ func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *
|
|||
}
|
||||
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.updateClusterBindingStatusIfNeeded(binding)
|
||||
}
|
||||
|
||||
func (s *Scheduler) getTypeFromResourceBindings(ns, name string) ScheduleType {
|
||||
|
@ -903,3 +909,47 @@ func (s *Scheduler) establishEstimatorConnections() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateBindingStatusIfNeeded sets the scheduled condition of ResourceBinding to true if needed
|
||||
func (s *Scheduler) updateBindingStatusIfNeeded(rb *workv1alpha2.ResourceBinding) error {
|
||||
oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled)
|
||||
newScheduledCondition := metav1.Condition{
|
||||
Type: workv1alpha2.Scheduled,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: scheduleSuccessReason,
|
||||
Message: scheduleSuccessMessage,
|
||||
}
|
||||
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
|
||||
return nil
|
||||
}
|
||||
|
||||
meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
|
||||
_, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update ResourceBinding status(%s/%s): %v", rb.Namespace, rb.Name, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateClusterBindingStatusIfNeeded sets the scheduled condition of ClusterResourceBinding to true if needed
|
||||
func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.ClusterResourceBinding) error {
|
||||
oldScheduledCondition := meta.FindStatusCondition(crb.Status.Conditions, workv1alpha2.Scheduled)
|
||||
newScheduledCondition := metav1.Condition{
|
||||
Type: workv1alpha2.Scheduled,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: scheduleSuccessReason,
|
||||
Message: scheduleSuccessMessage,
|
||||
}
|
||||
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
|
||||
return nil
|
||||
}
|
||||
|
||||
meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
|
||||
_, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update ClusterResourceBinding status(%s): %v", crb.Name, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -57,8 +57,8 @@ func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList {
|
|||
}
|
||||
|
||||
// IsBindingReady will check if resourceBinding/clusterResourceBinding is ready to build Work.
|
||||
func IsBindingReady(targetClusters []workv1alpha2.TargetCluster) bool {
|
||||
return len(targetClusters) != 0
|
||||
func IsBindingReady(status *workv1alpha2.ResourceBindingStatus) bool {
|
||||
return meta.IsStatusConditionTrue(status.Conditions, workv1alpha2.Scheduled)
|
||||
}
|
||||
|
||||
// HasScheduledReplica checks if the scheduler has assigned replicas for each cluster.
|
||||
|
|
|
@ -152,6 +152,15 @@ var _ = ginkgo.Describe("failover testing", func() {
|
|||
fmt.Printf("reschedule in %d target cluster\n", totalNum)
|
||||
})
|
||||
|
||||
ginkgo.By("check if the scheduled condition is true", func() {
|
||||
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
rb, err := getResourceBinding(deployment)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
|
||||
})
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
})
|
||||
|
||||
ginkgo.By("recover not ready cluster", func() {
|
||||
for _, disabledCluster := range disabledClusters {
|
||||
fmt.Printf("cluster %s is waiting for recovering\n", disabledCluster.Name)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/onsi/gomega"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -92,6 +93,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
|
|||
gomega.Expect(len(targetClusterNames) == minGroups).ShouldNot(gomega.BeFalse())
|
||||
})
|
||||
|
||||
ginkgo.By("check if the scheduled condition is true", func() {
|
||||
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
rb, err := getResourceBinding(deployment)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
return meta.IsStatusConditionTrue(rb.Status.Conditions, workv1alpha2.Scheduled), nil
|
||||
})
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
})
|
||||
|
||||
ginkgo.By("check if deployment present on right clusters", func() {
|
||||
for _, targetClusterName := range targetClusterNames {
|
||||
clusterClient := getClusterClient(targetClusterName)
|
||||
|
@ -266,6 +276,15 @@ var _ = ginkgo.Describe("propagation with label and group constraints testing",
|
|||
fmt.Printf("target clusters in cluster resource binding are %s\n", targetClusterNames)
|
||||
})
|
||||
|
||||
ginkgo.By("check if the scheduled condition is true", func() {
|
||||
err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
crb, err := getClusterResourceBinding(crd)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
return meta.IsStatusConditionTrue(crb.Status.Conditions, workv1alpha2.Scheduled), nil
|
||||
})
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
})
|
||||
|
||||
ginkgo.By("check if crd present on right clusters", func() {
|
||||
for _, targetClusterName := range targetClusterNames {
|
||||
clusterDynamicClient := getClusterDynamicClient(targetClusterName)
|
||||
|
@ -864,3 +883,27 @@ var _ = ginkgo.Describe("[ReplicaScheduling] ReplicaSchedulingStrategy testing",
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
// get the resource binding associated with the workload
|
||||
func getResourceBinding(workload interface{}) (*workv1alpha2.ResourceBinding, error) {
|
||||
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
obj := unstructured.Unstructured{Object: uncastObj}
|
||||
bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName())
|
||||
binding := &workv1alpha2.ResourceBinding{}
|
||||
|
||||
err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Namespace: obj.GetNamespace(), Name: bindingName}, binding)
|
||||
return binding, err
|
||||
}
|
||||
|
||||
// get the cluster resource binding associated with the workload
|
||||
func getClusterResourceBinding(workload interface{}) (*workv1alpha2.ClusterResourceBinding, error) {
|
||||
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
obj := unstructured.Unstructured{Object: uncastObj}
|
||||
bindingName := names.GenerateBindingName(obj.GetKind(), obj.GetName())
|
||||
binding := &workv1alpha2.ClusterResourceBinding{}
|
||||
|
||||
err = controlPlaneClient.Get(context.TODO(), client.ObjectKey{Name: bindingName}, binding)
|
||||
return binding, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue