Merge pull request #5317 from a7i/work-suspend-work-status-conditions

work suspension: work status conditions
This commit is contained in:
karmada-bot 2024-08-09 14:45:29 +08:00 committed by GitHub
commit fa05f9b600
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 138 additions and 43 deletions

View File

@ -152,6 +152,8 @@ const (
// WorkDegraded represents that the current state of Work does not match
// the desired state for a certain period.
WorkDegraded string = "Degraded"
// WorkDispatching represents the dispatching or suspension status of the Work resource
WorkDispatching string = "Dispatching"
)
// ResourceHealth represents that the health status of the reference resource.

View File

@ -30,7 +30,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -53,6 +52,10 @@ import (
const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "execution-controller"
// workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended.
workSuspendDispatchingConditionReason = "SuspendDispatching"
// workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended.
workDispatchingConditionReason = "Dispatching"
)
// Controller is to sync Work.
@ -94,7 +97,12 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return controllerruntime.Result{}, err
}
if ptr.Deref(work.Spec.SuspendDispatching, false) {
if err := c.updateWorkDispatchingConditionIfNeeded(ctx, work); err != nil {
klog.Errorf("Failed to update work condition type %s. err is %v", workv1alpha1.WorkDispatching, err)
return controllerruntime.Result{}, err
}
if helper.IsWorkSuspendDispatching(work) {
klog.V(4).Infof("Skip syncing work(%s/%s) for cluster(%s) as work dispatch is suspended.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{}, nil
}
@ -209,7 +217,7 @@ func (c *Controller) syncToClusters(ctx context.Context, clusterName string, wor
if len(errs) > 0 {
total := len(work.Spec.Workload.Manifests)
message := fmt.Sprintf("Failed to apply all manifests (%d/%d): %s", syncSucceedNum, total, errors.NewAggregate(errs).Error())
err := c.updateAppliedCondition(work, metav1.ConditionFalse, "AppliedFailed", message)
err := c.updateAppliedCondition(ctx, work, metav1.ConditionFalse, "AppliedFailed", message)
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
errs = append(errs, err)
@ -217,7 +225,7 @@ func (c *Controller) syncToClusters(ctx context.Context, clusterName string, wor
return errors.NewAggregate(errs)
}
err := c.updateAppliedCondition(work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied")
err := c.updateAppliedCondition(ctx, work, metav1.ConditionTrue, "AppliedSuccessful", "Manifest has been successfully applied")
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
@ -253,8 +261,31 @@ func (c *Controller) tryCreateOrUpdateWorkload(ctx context.Context, clusterName
return nil
}
func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context, work *workv1alpha1.Work) error {
newWorkDispatchingCondition := metav1.Condition{
Type: workv1alpha1.WorkDispatching,
LastTransitionTime: metav1.Now(),
}
if helper.IsWorkSuspendDispatching(work) {
newWorkDispatchingCondition.Status = metav1.ConditionFalse
newWorkDispatchingCondition.Reason = workSuspendDispatchingConditionReason
newWorkDispatchingCondition.Message = "Work dispatching is in a suspended state."
} else {
newWorkDispatchingCondition.Status = metav1.ConditionTrue
newWorkDispatchingCondition.Reason = workDispatchingConditionReason
newWorkDispatchingCondition.Message = "Work is being dispatched to member clusters."
}
if meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, newWorkDispatchingCondition.Type, newWorkDispatchingCondition.Status) {
return nil
}
return c.setStatusCondition(ctx, work, newWorkDispatchingCondition)
}
// updateAppliedCondition updates the applied condition for the given Work.
func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
func (c *Controller) updateAppliedCondition(ctx context.Context, work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) error {
newWorkAppliedCondition := metav1.Condition{
Type: workv1alpha1.WorkApplied,
Status: status,
@ -263,9 +294,13 @@ func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status meta
LastTransitionTime: metav1.Now(),
}
return c.setStatusCondition(ctx, work, newWorkAppliedCondition)
}
func (c *Controller) setStatusCondition(ctx context.Context, work *workv1alpha1.Work, statusCondition metav1.Condition) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
_, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error {
meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition)
_, err = helper.UpdateStatus(ctx, c.Client, work, func() error {
meta.SetStatusCondition(&work.Status.Conditions, statusCondition)
return nil
})
return err

View File

@ -20,11 +20,12 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
@ -32,41 +33,52 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
testhelper "github.com/karmada-io/karmada/test/helper"
)
func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
c Controller
work *workv1alpha1.Work
ns string
expectRes controllerruntime.Result
existErr bool
name string
work *workv1alpha1.Work
ns string
expectRes controllerruntime.Result
expectCondition *metav1.Condition
existErr bool
}{
{
name: "work dispatching is suspended, no error, no apply",
c: newController(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)),
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
},
Spec: workv1alpha1.WorkSpec{
SuspendDispatching: ptr.To(true),
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
work: newWork(func(work *workv1alpha1.Work) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
},
{
name: "work dispatching is suspended, adds false dispatching condition",
work: newWork(func(w *workv1alpha1.Work) {
w.Spec.SuspendDispatching = ptr.To(true)
}),
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse},
existErr: false,
},
{
name: "work dispatching is suspended, overwrites existing dispatching condition",
work: newWork(func(w *workv1alpha1.Work) {
w.Spec.SuspendDispatching = ptr.To(true)
meta.SetStatusCondition(&w.Status.Conditions, metav1.Condition{
Type: workv1alpha1.WorkDispatching,
Status: metav1.ConditionTrue,
Reason: workDispatchingConditionReason,
})
}),
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse},
existErr: false,
},
}
for _, tt := range tests {
@ -78,29 +90,40 @@ func TestExecutionController_Reconcile(t *testing.T) {
},
}
if err := tt.c.Client.Create(context.Background(), tt.work); err != nil {
t.Fatalf("Failed to create cluster: %v", err)
}
res, err := tt.c.Reconcile(context.Background(), req)
c := newController(tt.work)
res, err := c.Reconcile(context.Background(), req)
assert.Equal(t, tt.expectRes, res)
if tt.existErr {
assert.NotEmpty(t, err)
assert.Error(t, err)
} else {
assert.Empty(t, err)
assert.NoError(t, err)
}
if tt.expectCondition != nil {
assert.NoError(t, c.Client.Get(context.Background(), req.NamespacedName, tt.work))
assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.work.Status.Conditions, tt.expectCondition.Type, tt.expectCondition.Status))
}
})
}
}
func newController(objects ...client.Object) Controller {
func newController(work *workv1alpha1.Work) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
return Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(objects...).Build(),
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
}
}
func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), nil)
if applyFunc != nil {
applyFunc(work)
}
return work
}
func newCluster(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster {
return &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@ -173,3 +174,8 @@ func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.Gro
}
return false
}
// IsWorkSuspendDispatching checks if the work is suspended from dispatching.
func IsWorkSuspendDispatching(work *workv1alpha1.Work) bool {
return ptr.Deref(work.Spec.SuspendDispatching, false)
}

View File

@ -26,6 +26,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@ -34,7 +35,9 @@ import (
"k8s.io/utils/ptr"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/test/e2e/framework"
testhelper "github.com/karmada-io/karmada/test/helper"
@ -1079,7 +1082,18 @@ var _ = ginkgo.Describe("[Suspend] clusterPropagation testing", func() {
if err != nil {
return false
}
return work != nil && ptr.Deref(work.Spec.SuspendDispatching, false)
return work != nil && helper.IsWorkSuspendDispatching(work)
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
ginkgo.It("adds suspend dispatching condition to Work", func() {
esName := names.GenerateExecutionSpaceName(targetMember)
gomega.Eventually(func() bool {
work, err := karmadaClient.WorkV1alpha1().Works(esName).Get(context.TODO(), workName, metav1.GetOptions{})
if err != nil {
return false
}
return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse)
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
})

View File

@ -30,6 +30,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@ -41,7 +42,9 @@ import (
"k8s.io/utils/ptr"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/test/e2e/framework"
testhelper "github.com/karmada-io/karmada/test/helper"
@ -1163,7 +1166,19 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() {
if err != nil {
return false
}
return work != nil && ptr.Deref(work.Spec.SuspendDispatching, false)
return work != nil && helper.IsWorkSuspendDispatching(work)
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
ginkgo.It("adds suspend dispatching condition to Work\"", func() {
workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace)
esName := names.GenerateExecutionSpaceName(targetMember)
gomega.Eventually(func() bool {
work, err := karmadaClient.WorkV1alpha1().Works(esName).Get(context.TODO(), workName, metav1.GetOptions{})
if err != nil {
return false
}
return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse)
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
})