diff --git a/pkg/apis/work/v1alpha1/work_types.go b/pkg/apis/work/v1alpha1/work_types.go index 8abda2170..f1e03ee3e 100644 --- a/pkg/apis/work/v1alpha1/work_types.go +++ b/pkg/apis/work/v1alpha1/work_types.go @@ -152,6 +152,8 @@ const ( // WorkDegraded represents that the current state of Work does not match // the desired state for a certain period. WorkDegraded string = "Degraded" + // WorkDispatching represents the dispatching or suspension status of the Work resource + WorkDispatching string = "Dispatching" ) // ResourceHealth represents that the health status of the reference resource. diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 481fa73a8..0f1b5906f 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" - "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -53,6 +52,10 @@ import ( const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "execution-controller" + // workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended. + workSuspendDispatchingConditionReason = "SuspendDispatching" + // workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended. + workDispatchingConditionReason = "Dispatching" ) // Controller is to sync Work. @@ -94,7 +97,12 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{}, err } - if ptr.Deref(work.Spec.SuspendDispatching, false) { + if err := c.updateWorkDispatchingConditionIfNeeded(ctx, work); err != nil { + klog.Errorf("Failed to update work condition type %s. err is %v", workv1alpha1.WorkDispatching, err) + return controllerruntime.Result{}, err + } + + if helper.IsWorkSuspendDispatching(work) { klog.V(4).Infof("Skip syncing work(%s/%s) for cluster(%s) as work dispatch is suspended.", work.Namespace, work.Name, cluster.Name) return controllerruntime.Result{}, nil } @@ -209,7 +217,7 @@ func (c *Controller) syncToClusters(ctx context.Context, clusterName string, wor if len(errs) > 0 { total := len(work.Spec.Workload.Manifests) message := fmt.Sprintf("Failed to apply all manifests (%d/%d): %s", syncSucceedNum, total, errors.NewAggregate(errs).Error()) - err := c.updateAppliedCondition(work, metav1.ConditionFalse, "AppliedFailed", message) + err := c.updateAppliedCondition(ctx, work, metav1.ConditionFalse, "AppliedFailed", message) if err != nil { klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) errs = append(errs, err) @@ -217,7 +225,7 @@ func (c *Controller) syncToClusters(ctx context.Context, clusterName string, wor return errors.NewAggregate(errs) } - err := c.updateAppliedCondition(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied") + err := c.updateAppliedCondition(ctx, work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied") if err != nil { klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) return err @@ -253,8 +261,31 @@ func (c *Controller) tryCreateOrUpdateWorkload(ctx context.Context, clusterName return nil } +func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context, work *workv1alpha1.Work) error { + newWorkDispatchingCondition := metav1.Condition{ + Type: workv1alpha1.WorkDispatching, + LastTransitionTime: metav1.Now(), + } + + if helper.IsWorkSuspendDispatching(work) { + newWorkDispatchingCondition.Status = metav1.ConditionFalse + newWorkDispatchingCondition.Reason = workSuspendDispatchingConditionReason + newWorkDispatchingCondition.Message = "Work dispatching is in a suspended state." + } else { + newWorkDispatchingCondition.Status = metav1.ConditionTrue + newWorkDispatchingCondition.Reason = workDispatchingConditionReason + newWorkDispatchingCondition.Message = "Work is being dispatched to member clusters." + } + + if meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, newWorkDispatchingCondition.Type, newWorkDispatchingCondition.Status) { + return nil + } + + return c.setStatusCondition(ctx, work, newWorkDispatchingCondition) +} + // updateAppliedCondition updates the applied condition for the given Work. -func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error { +func (c *Controller) updateAppliedCondition(ctx context.Context, work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error { newWorkAppliedCondition := metav1.Condition{ Type: workv1alpha1.WorkApplied, Status: status, @@ -263,9 +294,13 @@ func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status meta LastTransitionTime: metav1.Now(), } + return c.setStatusCondition(ctx, work, newWorkAppliedCondition) +} + +func (c *Controller) setStatusCondition(ctx context.Context, work *workv1alpha1.Work, statusCondition metav1.Condition) error { return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - _, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error { - meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) + _, err = helper.UpdateStatus(ctx, c.Client, work, func() error { + meta.SetStatusCondition(&work.Status.Conditions, statusCondition) return nil }) return err diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go index d2b6bc679..5c8500a3c 100644 --- a/pkg/controllers/execution/execution_controller_test.go +++ b/pkg/controllers/execution/execution_controller_test.go @@ -20,11 +20,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -32,41 +33,52 @@ import ( "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" + testhelper "github.com/karmada-io/karmada/test/helper" ) func TestExecutionController_Reconcile(t *testing.T) { tests := []struct { - name string - c Controller - work *workv1alpha1.Work - ns string - expectRes controllerruntime.Result - existErr bool + name string + work *workv1alpha1.Work + ns string + expectRes controllerruntime.Result + expectCondition *metav1.Condition + existErr bool }{ { name: "work dispatching is suspended, no error, no apply", - c: newController(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)), - work: &workv1alpha1.Work{ - ObjectMeta: metav1.ObjectMeta{ - Name: "work", - Namespace: "karmada-es-cluster", - }, - Spec: workv1alpha1.WorkSpec{ - SuspendDispatching: ptr.To(true), - }, - Status: workv1alpha1.WorkStatus{ - Conditions: []metav1.Condition{ - { - Type: workv1alpha1.WorkApplied, - Status: metav1.ConditionTrue, - }, - }, - }, - }, + work: newWork(func(work *workv1alpha1.Work) { + work.Spec.SuspendDispatching = ptr.To(true) + }), ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, }, + { + name: "work dispatching is suspended, adds false dispatching condition", + work: newWork(func(w *workv1alpha1.Work) { + w.Spec.SuspendDispatching = ptr.To(true) + }), + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, + existErr: false, + }, + { + name: "work dispatching is suspended, overwrites existing dispatching condition", + work: newWork(func(w *workv1alpha1.Work) { + w.Spec.SuspendDispatching = ptr.To(true) + meta.SetStatusCondition(&w.Status.Conditions, metav1.Condition{ + Type: workv1alpha1.WorkDispatching, + Status: metav1.ConditionTrue, + Reason: workDispatchingConditionReason, + }) + }), + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, + existErr: false, + }, } for _, tt := range tests { @@ -78,29 +90,40 @@ func TestExecutionController_Reconcile(t *testing.T) { }, } - if err := tt.c.Client.Create(context.Background(), tt.work); err != nil { - t.Fatalf("Failed to create cluster: %v", err) - } - - res, err := tt.c.Reconcile(context.Background(), req) + c := newController(tt.work) + res, err := c.Reconcile(context.Background(), req) assert.Equal(t, tt.expectRes, res) if tt.existErr { - assert.NotEmpty(t, err) + assert.Error(t, err) } else { - assert.Empty(t, err) + assert.NoError(t, err) + } + + if tt.expectCondition != nil { + assert.NoError(t, c.Client.Get(context.Background(), req.NamespacedName, tt.work)) + assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.work.Status.Conditions, tt.expectCondition.Type, tt.expectCondition.Status)) } }) } } -func newController(objects ...client.Object) Controller { +func newController(work *workv1alpha1.Work) Controller { + cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) return Controller{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(objects...).Build(), + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).Build(), InformerManager: genericmanager.GetInstance(), PredicateFunc: helper.NewClusterPredicateOnAgent("test"), } } +func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work { + work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), nil) + if applyFunc != nil { + applyFunc(work) + } + return work +} + func newCluster(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster { return &clusterv1alpha1.Cluster{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/util/helper/work.go b/pkg/util/helper/work.go index c06aff201..c5956270e 100644 --- a/pkg/util/helper/work.go +++ b/pkg/util/helper/work.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -173,3 +174,8 @@ func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.Gro } return false } + +// IsWorkSuspendDispatching checks if the work is suspended from dispatching. +func IsWorkSuspendDispatching(work *workv1alpha1.Work) bool { + return ptr.Deref(work.Spec.SuspendDispatching, false) +} diff --git a/test/e2e/clusterpropagationpolicy_test.go b/test/e2e/clusterpropagationpolicy_test.go index 8c39b81a1..ad78ae1d8 100644 --- a/test/e2e/clusterpropagationpolicy_test.go +++ b/test/e2e/clusterpropagationpolicy_test.go @@ -26,6 +26,7 @@ import ( appsv1 "k8s.io/api/apps/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -34,7 +35,9 @@ import ( "k8s.io/utils/ptr" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/test/e2e/framework" testhelper "github.com/karmada-io/karmada/test/helper" @@ -1079,7 +1082,18 @@ var _ = ginkgo.Describe("[Suspend] clusterPropagation testing", func() { if err != nil { return false } - return work != nil && ptr.Deref(work.Spec.SuspendDispatching, false) + return work != nil && helper.IsWorkSuspendDispatching(work) + }, pollTimeout, pollInterval).Should(gomega.Equal(true)) + }) + + ginkgo.It("adds suspend dispatching condition to Work", func() { + esName := names.GenerateExecutionSpaceName(targetMember) + gomega.Eventually(func() bool { + work, err := karmadaClient.WorkV1alpha1().Works(esName).Get(context.TODO(), workName, metav1.GetOptions{}) + if err != nil { + return false + } + return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse) }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) }) diff --git a/test/e2e/propagationpolicy_test.go b/test/e2e/propagationpolicy_test.go index e23f2b606..6f8dcd48e 100644 --- a/test/e2e/propagationpolicy_test.go +++ b/test/e2e/propagationpolicy_test.go @@ -30,6 +30,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -41,7 +42,9 @@ import ( "k8s.io/utils/ptr" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/test/e2e/framework" testhelper "github.com/karmada-io/karmada/test/helper" @@ -1163,7 +1166,19 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() { if err != nil { return false } - return work != nil && ptr.Deref(work.Spec.SuspendDispatching, false) + return work != nil && helper.IsWorkSuspendDispatching(work) + }, pollTimeout, pollInterval).Should(gomega.Equal(true)) + }) + + ginkgo.It("adds suspend dispatching condition to Work\"", func() { + workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) + esName := names.GenerateExecutionSpaceName(targetMember) + gomega.Eventually(func() bool { + work, err := karmadaClient.WorkV1alpha1().Works(esName).Get(context.TODO(), workName, metav1.GetOptions{}) + if err != nil { + return false + } + return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse) }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) })