Merge pull request #5332 from a7i/work-suspend-event
work suspension: emit event for work dispatch status
This commit is contained in:
commit
b5b065da8a
|
@ -52,6 +52,10 @@ import (
|
|||
const (
|
||||
// ControllerName is the controller name that will be used when reporting events.
|
||||
ControllerName = "execution-controller"
|
||||
// WorkSuspendDispatchingConditionMessage is the condition and event message when dispatching is suspended.
|
||||
WorkSuspendDispatchingConditionMessage = "Work dispatching is in a suspended state."
|
||||
// WorkDispatchingConditionMessage is the condition and event message when dispatching is not suspended.
|
||||
WorkDispatchingConditionMessage = "Work is being dispatched to member clusters."
|
||||
// workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended.
|
||||
workSuspendDispatchingConditionReason = "SuspendDispatching"
|
||||
// workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended.
|
||||
|
@ -270,18 +274,28 @@ func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context,
|
|||
if helper.IsWorkSuspendDispatching(work) {
|
||||
newWorkDispatchingCondition.Status = metav1.ConditionFalse
|
||||
newWorkDispatchingCondition.Reason = workSuspendDispatchingConditionReason
|
||||
newWorkDispatchingCondition.Message = "Work dispatching is in a suspended state."
|
||||
newWorkDispatchingCondition.Message = WorkSuspendDispatchingConditionMessage
|
||||
} else {
|
||||
newWorkDispatchingCondition.Status = metav1.ConditionTrue
|
||||
newWorkDispatchingCondition.Reason = workDispatchingConditionReason
|
||||
newWorkDispatchingCondition.Message = "Work is being dispatched to member clusters."
|
||||
newWorkDispatchingCondition.Message = WorkDispatchingConditionMessage
|
||||
}
|
||||
|
||||
if meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, newWorkDispatchingCondition.Type, newWorkDispatchingCondition.Status) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.setStatusCondition(ctx, work, newWorkDispatchingCondition)
|
||||
if err := c.setStatusCondition(ctx, work, newWorkDispatchingCondition); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
obj, err := helper.ToUnstructured(work)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.eventf(obj, corev1.EventTypeNormal, events.EventReasonWorkDispatching, newWorkDispatchingCondition.Message)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateAppliedCondition updates the applied condition for the given Work.
|
||||
|
|
|
@ -17,19 +17,24 @@ package execution
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/utils/ptr"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/events"
|
||||
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
|
@ -38,34 +43,50 @@ import (
|
|||
|
||||
func TestExecutionController_Reconcile(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
work *workv1alpha1.Work
|
||||
ns string
|
||||
expectRes controllerruntime.Result
|
||||
expectCondition *metav1.Condition
|
||||
existErr bool
|
||||
name string
|
||||
work *workv1alpha1.Work
|
||||
ns string
|
||||
expectRes controllerruntime.Result
|
||||
expectCondition *metav1.Condition
|
||||
expectEventMessage string
|
||||
existErr bool
|
||||
}{
|
||||
{
|
||||
name: "work dispatching is suspended, no error, no apply",
|
||||
work: newWork(func(work *workv1alpha1.Work) {
|
||||
work.Spec.SuspendDispatching = ptr.To(true)
|
||||
}),
|
||||
name: "work dispatching is suspended, no error, no apply",
|
||||
ns: "karmada-es-cluster",
|
||||
expectRes: controllerruntime.Result{},
|
||||
existErr: false,
|
||||
work: newWork(func(work *workv1alpha1.Work) {
|
||||
work.Spec.SuspendDispatching = ptr.To(true)
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "work dispatching is suspended, adds false dispatching condition",
|
||||
work: newWork(func(w *workv1alpha1.Work) {
|
||||
w.Spec.SuspendDispatching = ptr.To(true)
|
||||
}),
|
||||
name: "work dispatching is suspended, adds false dispatching condition",
|
||||
ns: "karmada-es-cluster",
|
||||
expectRes: controllerruntime.Result{},
|
||||
expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse},
|
||||
existErr: false,
|
||||
|
||||
work: newWork(func(w *workv1alpha1.Work) {
|
||||
w.Spec.SuspendDispatching = ptr.To(true)
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "work dispatching is suspended, overwrites existing dispatching condition",
|
||||
name: "work dispatching is suspended, adds event message",
|
||||
ns: "karmada-es-cluster",
|
||||
expectRes: controllerruntime.Result{},
|
||||
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonWorkDispatching, WorkSuspendDispatchingConditionMessage),
|
||||
existErr: false,
|
||||
work: newWork(func(w *workv1alpha1.Work) {
|
||||
w.Spec.SuspendDispatching = ptr.To(true)
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "work dispatching is suspended, overwrites existing dispatching condition",
|
||||
ns: "karmada-es-cluster",
|
||||
expectRes: controllerruntime.Result{},
|
||||
expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse},
|
||||
existErr: false,
|
||||
work: newWork(func(w *workv1alpha1.Work) {
|
||||
w.Spec.SuspendDispatching = ptr.To(true)
|
||||
meta.SetStatusCondition(&w.Status.Conditions, metav1.Condition{
|
||||
|
@ -74,10 +95,6 @@ func TestExecutionController_Reconcile(t *testing.T) {
|
|||
Reason: workDispatchingConditionReason,
|
||||
})
|
||||
}),
|
||||
ns: "karmada-es-cluster",
|
||||
expectRes: controllerruntime.Result{},
|
||||
expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse},
|
||||
existErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -90,7 +107,8 @@ func TestExecutionController_Reconcile(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
c := newController(tt.work)
|
||||
eventRecorder := record.NewFakeRecorder(1)
|
||||
c := newController(tt.work, eventRecorder)
|
||||
res, err := c.Reconcile(context.Background(), req)
|
||||
assert.Equal(t, tt.expectRes, res)
|
||||
if tt.existErr {
|
||||
|
@ -103,21 +121,30 @@ func TestExecutionController_Reconcile(t *testing.T) {
|
|||
assert.NoError(t, c.Client.Get(context.Background(), req.NamespacedName, tt.work))
|
||||
assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.work.Status.Conditions, tt.expectCondition.Type, tt.expectCondition.Status))
|
||||
}
|
||||
|
||||
if tt.expectEventMessage != "" {
|
||||
assert.Equal(t, 1, len(eventRecorder.Events))
|
||||
e := <-eventRecorder.Events
|
||||
assert.Equal(t, tt.expectEventMessage, e)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newController(work *workv1alpha1.Work) Controller {
|
||||
func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
|
||||
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
|
||||
return Controller{
|
||||
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).Build(),
|
||||
InformerManager: genericmanager.GetInstance(),
|
||||
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
|
||||
EventRecorder: eventRecorder,
|
||||
}
|
||||
}
|
||||
|
||||
func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
|
||||
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), nil)
|
||||
pod := testhelper.NewPod("default", "test")
|
||||
bytes, _ := json.Marshal(pod)
|
||||
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
|
||||
if applyFunc != nil {
|
||||
applyFunc(work)
|
||||
}
|
||||
|
|
|
@ -54,6 +54,8 @@ const (
|
|||
EventReasonSyncWorkloadFailed = "SyncFailed"
|
||||
// EventReasonSyncWorkloadSucceed indicates that Sync workload succeed.
|
||||
EventReasonSyncWorkloadSucceed = "SyncSucceed"
|
||||
// EventReasonWorkDispatching indicates that work is dispatching or not.
|
||||
EventReasonWorkDispatching = "WorkDispatching"
|
||||
)
|
||||
|
||||
// Define events for ResourceBinding and ClusterResourceBinding objects.
|
||||
|
|
|
@ -21,11 +21,10 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/duration"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/duration"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/printers"
|
||||
|
|
|
@ -489,7 +489,6 @@ func (cache *cacheImpl) removePod(pod *corev1.Pod) error {
|
|||
n, ok := cache.nodes[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))
|
||||
|
||||
} else {
|
||||
if err := n.info.RemovePod(pod); err != nil {
|
||||
return err
|
||||
|
|
|
@ -266,7 +266,6 @@ func validateMetricSpec(spec autoscalingv2.MetricSpec, fldPath *field.Path) fiel
|
|||
|
||||
var expectedField string
|
||||
switch spec.Type {
|
||||
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
if spec.Object == nil {
|
||||
allErrs = append(allErrs, field.Required(fldPath.Child("object"), "must populate information for the given metric source"))
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
Copyright 2024 The Karmada Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package framework
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
"github.com/onsi/gomega"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// WaitEventFitWith wait PropagationPolicy sync with fit func.
|
||||
func WaitEventFitWith(kubeClient kubernetes.Interface, namespace string, involvedObj string, fit func(policy corev1.Event) bool) {
|
||||
gomega.Eventually(func() bool {
|
||||
eventList, err := kubeClient.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("involvedObject.name", involvedObj).String(),
|
||||
})
|
||||
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
|
||||
return slices.ContainsFunc(eventList.Items, fit)
|
||||
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
|
||||
}
|
|
@ -44,6 +44,8 @@ import (
|
|||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
"github.com/karmada-io/karmada/pkg/events"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
"github.com/karmada-io/karmada/test/e2e/framework"
|
||||
|
@ -100,6 +102,15 @@ var _ = ginkgo.Describe("[BasicPropagation] propagation testing", func() {
|
|||
return *deployment.Spec.Replicas == updateDeploymentReplicas
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.It("adds dispatching event with a dispatching message", func() {
|
||||
workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace)
|
||||
esName := names.GenerateExecutionSpaceName(framework.ClusterNames()[0])
|
||||
framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool {
|
||||
return event.Reason == events.EventReasonWorkDispatching &&
|
||||
event.Message == execution.WorkDispatchingConditionMessage
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Context("Service propagation testing", func() {
|
||||
|
@ -1170,7 +1181,7 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() {
|
|||
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
|
||||
})
|
||||
|
||||
ginkgo.It("adds suspend dispatching condition to Work\"", func() {
|
||||
ginkgo.It("adds suspend dispatching condition to Work", func() {
|
||||
workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace)
|
||||
esName := names.GenerateExecutionSpaceName(targetMember)
|
||||
gomega.Eventually(func() bool {
|
||||
|
@ -1181,5 +1192,14 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() {
|
|||
return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse)
|
||||
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
|
||||
})
|
||||
|
||||
ginkgo.It("adds dispatching event with suspend message", func() {
|
||||
workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace)
|
||||
esName := names.GenerateExecutionSpaceName(targetMember)
|
||||
framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool {
|
||||
return event.Reason == events.EventReasonWorkDispatching &&
|
||||
event.Message == execution.WorkSuspendDispatchingConditionMessage
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -960,6 +960,10 @@ func NewPodDisruptionBudget(namespace, name string, maxUnAvailable intstr.IntOrS
|
|||
// NewWork will build a new Work object.
|
||||
func NewWork(workName, workNs, workUID string, raw []byte) *workv1alpha1.Work {
|
||||
work := &workv1alpha1.Work{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: workv1alpha1.ResourceKindWork,
|
||||
APIVersion: workv1alpha1.GroupVersion.Version,
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: workName,
|
||||
Namespace: workNs,
|
||||
|
|
Loading…
Reference in New Issue