fix: work status sync when work dispatching is suspended

Signed-off-by: Amir Alavi <amiralavi7@gmail.com>
This commit is contained in:
Amir Alavi 2024-08-19 11:34:15 -04:00
parent 6318541d18
commit 5f54261725
No known key found for this signature in database
GPG Key ID: A82919F366EE978E
2 changed files with 55 additions and 35 deletions

View File

@ -222,12 +222,25 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
return nil return nil
} }
if err := c.updateResource(ctx, observedObj, workObject, fedKey); err != nil {
return err
}
klog.Infof("Reflecting the resource(kind=%s, %s/%s) status to the Work(%s/%s).", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
return c.reflectStatus(ctx, workObject, observedObj)
}
func (c *WorkStatusController) updateResource(ctx context.Context, observedObj *unstructured.Unstructured, workObject *workv1alpha1.Work, fedKey keys.FederatedKey) error {
if helper.IsWorkSuspendDispatching(workObject) {
return nil
}
desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj) desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj)
if err != nil { if err != nil {
return err return err
} }
clusterName, err := names.GetClusterName(workNamespace) clusterName, err := names.GetClusterName(workObject.Namespace)
if err != nil { if err != nil {
klog.Errorf("Failed to get member cluster name: %v", err) klog.Errorf("Failed to get member cluster name: %v", err)
return err return err
@ -255,9 +268,7 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
// also needs to update again. The update operation will be a non-operation if the event triggered by Service's // also needs to update again. The update operation will be a non-operation if the event triggered by Service's
// status changes. // status changes.
} }
return nil
klog.Infof("Reflecting the resource(kind=%s, %s/%s) status to the Work(%s/%s).", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
return c.reflectStatus(ctx, workObject, observedObj)
} }
func (c *WorkStatusController) handleDeleteEvent(ctx context.Context, key keys.FederatedKey) error { func (c *WorkStatusController) handleDeleteEvent(ctx context.Context, key keys.FederatedKey) error {

View File

@ -32,16 +32,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
dynamicfake "k8s.io/client-go/dynamic/fake" dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime" controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
@ -571,10 +571,9 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod *corev1.Pod pod *corev1.Pod
raw []byte raw []byte
controllerWithoutInformer bool controllerWithoutInformer bool
workWithRightNS bool
expectedError bool expectedError bool
workWithDeletionTimestamp bool
wrongWorkNS bool wrongWorkNS bool
workApplyFunc func(work *workv1alpha1.Work)
}{ }{
{ {
name: "failed to exec NeedUpdate", name: "failed to exec NeedUpdate",
@ -582,7 +581,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true, expectedError: true,
}, },
{ {
@ -591,7 +589,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true, expectedError: true,
}, },
{ {
@ -600,7 +597,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: false, controllerWithoutInformer: false,
workWithRightNS: true,
expectedError: true, expectedError: true,
}, },
{ {
@ -608,7 +604,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
obj: newPodObj("karmada-es-cluster"), obj: newPodObj("karmada-es-cluster"),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: false, expectedError: false,
}, },
{ {
@ -617,7 +612,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName, true), pod: newPod(workNs, workName, true),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: false, expectedError: false,
}, },
{ {
@ -626,8 +620,10 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: false,
expectedError: false, expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.SetName(fmt.Sprintf("%v-test", workNs))
},
}, },
{ {
name: "failed to getRawManifest, wrong Manifests in work", name: "failed to getRawManifest, wrong Manifests in work",
@ -635,7 +631,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true, expectedError: true,
}, },
{ {
@ -644,10 +639,31 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName), pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`), raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true, controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true, expectedError: true,
wrongWorkNS: true, wrongWorkNS: true,
}, },
{
name: "skips work with suspend dispatching",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.Spec.SuspendDispatching = ptr.To(true)
},
},
{
name: "skips work with deletion timestamp",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.SetDeletionTimestamp(ptr.To(metav1.Now()))
},
},
} }
for _, tt := range tests { for _, tt := range tests {
@ -671,11 +687,9 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
c = newWorkStatusController(cluster) c = newWorkStatusController(cluster)
} }
var work *workv1alpha1.Work work := testhelper.NewWork(workName, workNs, workUID, tt.raw)
if tt.workWithRightNS { if tt.workApplyFunc != nil {
work = testhelper.NewWork(workName, workNs, workUID, tt.raw) tt.workApplyFunc(work)
} else {
work = testhelper.NewWork(workName, fmt.Sprintf("%v-test", workNs), workUID, tt.raw)
} }
key, _ := generateKey(tt.obj) key, _ := generateKey(tt.obj)
@ -686,9 +700,9 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
err := c.syncWorkStatus(key) err := c.syncWorkStatus(key)
if tt.expectedError { if tt.expectedError {
assert.NotEmpty(t, err) assert.Error(t, err)
} else { } else {
assert.Empty(t, err) assert.NoError(t, err)
} }
}) })
} }
@ -696,13 +710,14 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController { func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController {
c := WorkStatusController{ c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(), Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).WithStatusSubresource().Build(),
InformerManager: genericmanager.GetInstance(), InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"), PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{}, ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{}, RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil, eventHandler: nil,
EventRecorder: record.NewFakeRecorder(1024),
RESTMapper: func() meta.RESTMapper { RESTMapper: func() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace) m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
@ -711,21 +726,15 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets
} }
if len(dynamicClientSets) > 0 { if len(dynamicClientSets) > 0 {
c.ResourceInterpreter = FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()}
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)
// Generate InformerManager
clusterName := cluster.Name clusterName := cluster.Name
dynamicClientSet := dynamicClientSets[0] dynamicClientSet := dynamicClientSets[0]
// Generate ResourceInterpreter and ObjectWatcher // Generate ResourceInterpreter and ObjectWatcher
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopCh)
controlPlaneKubeClientSet := kubernetesfake.NewSimpleClientset()
sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()
c.ResourceInterpreter = resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)
// Generate InformerManager
m := genericmanager.NewMultiClusterInformerManager(stopCh) m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName) m.Start(clusterName)