Merge pull request #4360 from Rains6/dev-mcs-zhc
feat: sync mcs work when cluster joined or unjoin
This commit is contained in:
commit
0b1681ed83
|
@ -95,6 +95,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.")
|
_ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.")
|
||||||
|
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, "Service is propagated to target clusters.")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil {
|
if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil {
|
||||||
|
@ -191,13 +192,13 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
|
||||||
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
|
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !deleteAll && provisionClusters.Has(clusterName) {
|
|
||||||
|
if !deleteAll && provisionClusters.Has(clusterName) && c.IsClusterReady(clusterName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
|
if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
|
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
|
||||||
work.Namespace, work.Name, err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,13 +276,16 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||||
provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs)
|
provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for clusterName := range provisionCluster {
|
for clusterName := range provisionClusters {
|
||||||
|
if !c.IsClusterReady(clusterName) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
workMeta := metav1.ObjectMeta{
|
workMeta := metav1.ObjectMeta{
|
||||||
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
|
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
|
||||||
Namespace: names.GenerateExecutionSpaceName(clusterName),
|
Namespace: names.GenerateExecutionSpaceName(clusterName),
|
||||||
|
@ -392,6 +396,16 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *MCSController) IsClusterReady(clusterName string) bool {
|
||||||
|
cluster := &clusterv1alpha1.Cluster{}
|
||||||
|
if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: clusterName}, cluster); err != nil {
|
||||||
|
klog.ErrorS(err, "failed to get cluster object", "Name", clusterName)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return util.IsClusterReady(&cluster.Status)
|
||||||
|
}
|
||||||
|
|
||||||
// SetupWithManager creates a controller and register to controller manager.
|
// SetupWithManager creates a controller and register to controller manager.
|
||||||
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
|
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||||
mcsPredicateFunc := predicate.Funcs{
|
mcsPredicateFunc := predicate.Funcs{
|
||||||
|
@ -460,6 +474,47 @@ func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||||
return controllerruntime.NewControllerManagedBy(mgr).
|
return controllerruntime.NewControllerManagedBy(mgr).
|
||||||
For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)).
|
For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)).
|
||||||
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)).
|
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)).
|
||||||
|
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.clusterMapFunc())).
|
||||||
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
|
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
|
||||||
Complete(c)
|
Complete(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *MCSController) clusterMapFunc() handler.MapFunc {
|
||||||
|
return func(ctx context.Context, a client.Object) []reconcile.Request {
|
||||||
|
var clusterName string
|
||||||
|
switch t := a.(type) {
|
||||||
|
case *clusterv1alpha1.Cluster:
|
||||||
|
clusterName = t.Name
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName)
|
||||||
|
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
||||||
|
if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil {
|
||||||
|
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var requests []reconcile.Request
|
||||||
|
for index := range mcsList.Items {
|
||||||
|
if !needSyncMultiClusterService(&mcsList.Items[index], clusterName) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mcsList.Items[index].Namespace,
|
||||||
|
Name: mcsList.Items[index].Name}})
|
||||||
|
}
|
||||||
|
|
||||||
|
return requests
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func needSyncMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, clusterName string) bool {
|
||||||
|
if len(mcs.Spec.ServiceProvisionClusters) == 0 || len(mcs.Spec.ServiceConsumptionClusters) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
clusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
|
||||||
|
clusters.Insert(mcs.Spec.ServiceConsumptionClusters...)
|
||||||
|
return clusters.Has(clusterName)
|
||||||
|
}
|
||||||
|
|
|
@ -108,26 +108,26 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster
|
||||||
|
|
||||||
func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
||||||
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
|
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
|
||||||
if len(provisionClusters) == 0 {
|
existClusters, err := util.GetClusterSet(client)
|
||||||
var err error
|
|
||||||
provisionClusters, err = util.GetClusterSet(client)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(provisionClusters) == 0 {
|
||||||
|
return existClusters, nil
|
||||||
}
|
}
|
||||||
return provisionClusters, nil
|
return provisionClusters.Intersection(existClusters), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
||||||
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
|
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
|
||||||
if len(consumptionClusters) == 0 {
|
existClusters, err := util.GetClusterSet(client)
|
||||||
var err error
|
|
||||||
consumptionClusters, err = util.GetClusterSet(client)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(consumptionClusters) == 0 {
|
||||||
|
return existClusters, nil
|
||||||
}
|
}
|
||||||
return consumptionClusters, nil
|
return consumptionClusters.Intersection(existClusters), nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue