Merge pull request #4356 from jwcesign/consumption-cluster-addition-sync
feat: dispatch eps to the newly joined consumption clusters
This commit is contained in:
commit
1cafe7fb8f
|
@ -40,6 +40,7 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/events"
|
||||
|
@ -169,9 +170,73 @@ func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime
|
|||
}
|
||||
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).
|
||||
Watches(&networkingv1alpha1.MultiClusterService{}, handler.EnqueueRequestsFromMapFunc(c.newMultiClusterServiceFunc())).
|
||||
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.newClusterFunc())).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) newClusterFunc() handler.MapFunc {
|
||||
return func(ctx context.Context, a client.Object) []reconcile.Request {
|
||||
var clusterName string
|
||||
switch t := a.(type) {
|
||||
case *clusterv1alpha1.Cluster:
|
||||
clusterName = t.Name
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
||||
if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil {
|
||||
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var requests []reconcile.Request
|
||||
for _, mcs := range mcsList.Items {
|
||||
clusterSet, err := helper.GetConsumptionClustres(c.Client, mcs.DeepCopy())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get provision clusters, error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !clusterSet.Has(clusterName) {
|
||||
continue
|
||||
}
|
||||
|
||||
workList, err := c.getClusterEndpointSliceWorks(mcs.Namespace, mcs.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list work, error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, work := range workList {
|
||||
// This annotation is only added to the EndpointSlice work in consumption clusters' execution namespace
|
||||
// Here, when new cluster joins to Karmada and it's in the consumption cluster set, we need to re-sync the EndpointSlice work
|
||||
// from provision cluster to the newly joined cluster
|
||||
if util.GetLabelValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: work.Namespace, Name: work.Name}})
|
||||
}
|
||||
}
|
||||
return requests
|
||||
}
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) getClusterEndpointSliceWorks(mcsNamespace, mcsName string) ([]workv1alpha1.Work, error) {
|
||||
workList := &workv1alpha1.WorkList{}
|
||||
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
util.MultiClusterServiceNameLabel: mcsName,
|
||||
util.MultiClusterServiceNamespaceLabel: mcsNamespace,
|
||||
}),
|
||||
}); err != nil {
|
||||
klog.Errorf("Failed to list work, error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return workList.Items, nil
|
||||
}
|
||||
|
||||
func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.MapFunc {
|
||||
return func(ctx context.Context, a client.Object) []reconcile.Request {
|
||||
var mcsName, mcsNamespace string
|
||||
|
@ -183,19 +248,14 @@ func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.M
|
|||
return nil
|
||||
}
|
||||
|
||||
workList := &workv1alpha1.WorkList{}
|
||||
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(map[string]string{
|
||||
util.MultiClusterServiceNameLabel: mcsName,
|
||||
util.MultiClusterServiceNamespaceLabel: mcsNamespace,
|
||||
}),
|
||||
}); err != nil {
|
||||
workList, err := c.getClusterEndpointSliceWorks(mcsNamespace, mcsName)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list work, error: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var requests []reconcile.Request
|
||||
for _, work := range workList.Items {
|
||||
for _, work := range workList {
|
||||
// We only care about the EndpointSlice work from provision clusters
|
||||
if util.GetLabelValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
|
||||
continue
|
||||
|
|
|
@ -88,7 +88,7 @@ const (
|
|||
// The overrides items should be sorted alphabetically in ascending order by ClusterOverridePolicy's name.
|
||||
AppliedClusterOverrides = "policy.karmada.io/applied-cluster-overrides"
|
||||
|
||||
// EndpointSliceProvisionClusterAnnotation is added to EndpointSlice to specify the cluster which cluster provides the EndpointSlice.
|
||||
// EndPointSliceProvisionClusterAnnotation is added to work of the dispatch EndpointSlice in consumption clusters's namespace.
|
||||
EndpointSliceProvisionClusterAnnotation = "endpointslice.karmada.io/provision-cluster"
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue