Merge pull request #4343 from jwcesign/clean-eps-when-mcs-changes
feat: clean the endpointslice if the current cluster is not in the mcs consumption clusters
This commit is contained in:
commit
68d937e716
|
@ -140,6 +140,11 @@ func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alph
|
|||
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
|
||||
continue
|
||||
}
|
||||
// We only care about the EndpointSlice work in provision clusters
|
||||
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil {
|
||||
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
|
||||
errs = append(errs, err)
|
||||
|
@ -334,7 +339,7 @@ func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceK
|
|||
return err
|
||||
}
|
||||
|
||||
if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
|
||||
if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -393,7 +398,7 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1
|
|||
klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err)
|
||||
return err
|
||||
}
|
||||
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
|
||||
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
|
||||
continue
|
||||
}
|
||||
epsUnstructured, err := helper.ToUnstructured(eps)
|
||||
|
|
|
@ -18,7 +18,6 @@ package multiclusterservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
|
@ -26,6 +25,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
@ -36,7 +36,9 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
|
@ -79,7 +81,7 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con
|
|||
|
||||
if !work.DeletionTimestamp.IsZero() {
|
||||
if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil {
|
||||
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s", work.Namespace, work.Name)
|
||||
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s:%v", work.Namespace, work.Name, err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
return controllerruntime.Result{}, nil
|
||||
|
@ -107,6 +109,10 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con
|
|||
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonDispatchEndpointSliceSucceed, "EndpointSlice are synced successfully")
|
||||
}()
|
||||
|
||||
if err = c.cleanOrphanDispatchedEndpointSlice(ctx, mcs); err != nil {
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
if err = c.syncEndpointSlice(ctx, work.DeepCopy(), mcs); err != nil {
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
@ -143,19 +149,104 @@ func (c *EndpointsliceDispatchController) updateEndpointSliceSynced(mcs *network
|
|||
func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||
workPredicateFun := predicate.Funcs{
|
||||
CreateFunc: func(createEvent event.CreateEvent) bool {
|
||||
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
|
||||
// We only care about the EndpointSlice work from provision clusters
|
||||
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != "" &&
|
||||
util.GetAnnotationValue(createEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
|
||||
},
|
||||
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
|
||||
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != ""
|
||||
// We only care about the EndpointSlice work from provision clusters
|
||||
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != "" &&
|
||||
util.GetAnnotationValue(updateEvent.ObjectNew.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
|
||||
},
|
||||
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
|
||||
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
|
||||
// We only care about the EndpointSlice work from provision clusters
|
||||
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != "" &&
|
||||
util.GetAnnotationValue(deleteEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
|
||||
},
|
||||
GenericFunc: func(genericEvent event.GenericEvent) bool {
|
||||
return false
|
||||
},
|
||||
}
|
||||
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).Complete(c)
|
||||
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).
|
||||
Watches(&networkingv1alpha1.MultiClusterService{}, handler.EnqueueRequestsFromMapFunc(c.newMultiClusterServiceFunc())).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.MapFunc {
|
||||
return func(ctx context.Context, a client.Object) []reconcile.Request {
|
||||
var mcsName, mcsNamespace string
|
||||
switch t := a.(type) {
|
||||
case *networkingv1alpha1.MultiClusterService:
|
||||
mcsNamespace = t.Namespace
|
||||
mcsName = t.Name
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
workList := &workv1alpha1.WorkList{}
|
||||
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
util.ServiceNameLabel: mcsName,
|
||||
util.ServiceNamespaceLabel: mcsNamespace,
|
||||
}),
|
||||
}); err != nil {
|
||||
klog.Errorf("Failed to list work, error: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var requests []reconcile.Request
|
||||
for _, work := range workList.Items {
|
||||
// We only care about the EndpointSlice work from provision clusters
|
||||
if util.GetLabelValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: work.Namespace, Name: work.Name}})
|
||||
}
|
||||
return requests
|
||||
}
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||
workList := &workv1alpha1.WorkList{}
|
||||
if err := c.Client.List(ctx, workList, &client.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
util.ServiceNameLabel: mcs.Name,
|
||||
util.ServiceNamespaceLabel: mcs.Namespace,
|
||||
})}); err != nil {
|
||||
klog.Errorf("Failed to list works, error is: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, work := range workList.Items {
|
||||
// We only care about the EndpointSlice work in consumption clusters
|
||||
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
consumptionClusters, err := helper.GetConsumptionClustres(c.Client, mcs)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get consumption clusters, error is: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
cluster, err := names.GetClusterName(work.Namespace)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
if consumptionClusters.Has(cluster) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.Client.Delete(ctx, work.DeepCopy()); err != nil {
|
||||
klog.Errorf("Failed to delete work %s/%s, error is: %v", work.Namespace, work.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||
|
@ -205,12 +296,11 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context,
|
|||
workv1alpha1.WorkNamespaceLabel: clusterNamespace,
|
||||
workv1alpha1.WorkNameLabel: work.Name,
|
||||
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
|
||||
discoveryv1.LabelManagedBy: util.EndpointSliceControllerLabelValue,
|
||||
discoveryv1.LabelManagedBy: util.EndpointSliceDispatchControllerLabelValue,
|
||||
}
|
||||
endpointSlice.Annotations = map[string]string{
|
||||
// This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version
|
||||
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
|
||||
util.EndPointSliceProvisionGenerationAnnotation: strconv.FormatInt(endpointSlice.Generation, 10),
|
||||
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
|
||||
}
|
||||
|
||||
workMeta := metav1.ObjectMeta{
|
||||
|
@ -222,6 +312,8 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context,
|
|||
},
|
||||
Labels: map[string]string{
|
||||
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
|
||||
util.ServiceNameLabel: mcs.Name,
|
||||
util.ServiceNamespaceLabel: mcs.Namespace,
|
||||
},
|
||||
}
|
||||
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
|
||||
|
|
|
@ -51,8 +51,8 @@ const (
|
|||
// ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers.
|
||||
ManagedByKarmadaLabelValue = "true"
|
||||
|
||||
// EndpointSliceControllerLabelValue indicates the endpointSlice are controlled by karmada-mcs-endpointslice-controller
|
||||
EndpointSliceControllerLabelValue = "karmada-mcs-endpointslice-controller"
|
||||
// EndpointSliceDispatchControllerLabelValue indicates the endpointSlice are controlled by Karmada
|
||||
EndpointSliceDispatchControllerLabelValue = "endpointslice-dispatch-controller.karmada.io"
|
||||
|
||||
// RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g:
|
||||
// resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain
|
||||
|
|
|
@ -118,3 +118,16 @@ func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClu
|
|||
}
|
||||
return provisionClusters, nil
|
||||
}
|
||||
|
||||
func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
||||
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
|
||||
if len(consumptionClusters) == 0 {
|
||||
var err error
|
||||
consumptionClusters, err = util.GetClusterSet(client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return consumptionClusters, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue