List&watch cluster creation/deletion, reconcile the work in the corresponding cluster execution namespace
Signed-off-by: HongCen <zhouhongcen1@huawei.com>
This commit is contained in:
parent
27a3e54b5b
commit
8277a985b0
|
@ -95,6 +95,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
|
|||
return
|
||||
}
|
||||
_ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.")
|
||||
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, "Service is propagated to target clusters.")
|
||||
}()
|
||||
|
||||
if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil {
|
||||
|
@ -191,13 +192,13 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
|
|||
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
|
||||
continue
|
||||
}
|
||||
if !deleteAll && provisionClusters.Has(clusterName) {
|
||||
|
||||
if !deleteAll && provisionClusters.Has(clusterName) && c.IsClusterReady(clusterName) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
|
||||
work.Namespace, work.Name, err)
|
||||
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -275,13 +276,16 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
|
|||
}
|
||||
|
||||
func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||
provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs)
|
||||
provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for clusterName := range provisionCluster {
|
||||
for clusterName := range provisionClusters {
|
||||
if !c.IsClusterReady(clusterName) {
|
||||
continue
|
||||
}
|
||||
workMeta := metav1.ObjectMeta{
|
||||
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
|
||||
Namespace: names.GenerateExecutionSpaceName(clusterName),
|
||||
|
@ -392,6 +396,16 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ
|
|||
})
|
||||
}
|
||||
|
||||
func (c *MCSController) IsClusterReady(clusterName string) bool {
|
||||
cluster := &clusterv1alpha1.Cluster{}
|
||||
if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: clusterName}, cluster); err != nil {
|
||||
klog.ErrorS(err, "failed to get cluster object", "Name", clusterName)
|
||||
return false
|
||||
}
|
||||
|
||||
return util.IsClusterReady(&cluster.Status)
|
||||
}
|
||||
|
||||
// SetupWithManager creates a controller and register to controller manager.
|
||||
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||
mcsPredicateFunc := predicate.Funcs{
|
||||
|
@ -460,6 +474,47 @@ func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
|
|||
return controllerruntime.NewControllerManagedBy(mgr).
|
||||
For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)).
|
||||
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)).
|
||||
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.clusterMapFunc())).
|
||||
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *MCSController) clusterMapFunc() handler.MapFunc {
|
||||
return func(ctx context.Context, a client.Object) []reconcile.Request {
|
||||
var clusterName string
|
||||
switch t := a.(type) {
|
||||
case *clusterv1alpha1.Cluster:
|
||||
clusterName = t.Name
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName)
|
||||
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
||||
if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil {
|
||||
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var requests []reconcile.Request
|
||||
for index := range mcsList.Items {
|
||||
if !needSyncMultiClusterService(&mcsList.Items[index], clusterName) {
|
||||
continue
|
||||
}
|
||||
|
||||
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mcsList.Items[index].Namespace,
|
||||
Name: mcsList.Items[index].Name}})
|
||||
}
|
||||
|
||||
return requests
|
||||
}
|
||||
}
|
||||
|
||||
func needSyncMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, clusterName string) bool {
|
||||
if len(mcs.Spec.ServiceProvisionClusters) == 0 || len(mcs.Spec.ServiceConsumptionClusters) == 0 {
|
||||
return true
|
||||
}
|
||||
clusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
|
||||
clusters.Insert(mcs.Spec.ServiceConsumptionClusters...)
|
||||
return clusters.Has(clusterName)
|
||||
}
|
||||
|
|
|
@ -108,26 +108,26 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster
|
|||
|
||||
func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
||||
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
|
||||
if len(provisionClusters) == 0 {
|
||||
var err error
|
||||
provisionClusters, err = util.GetClusterSet(client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
existClusters, err := util.GetClusterSet(client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return provisionClusters, nil
|
||||
if len(provisionClusters) == 0 {
|
||||
return existClusters, nil
|
||||
}
|
||||
return provisionClusters.Intersection(existClusters), nil
|
||||
}
|
||||
|
||||
func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
|
||||
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
|
||||
if len(consumptionClusters) == 0 {
|
||||
var err error
|
||||
consumptionClusters, err = util.GetClusterSet(client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
existClusters, err := util.GetClusterSet(client)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster set, Error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return consumptionClusters, nil
|
||||
if len(consumptionClusters) == 0 {
|
||||
return existClusters, nil
|
||||
}
|
||||
return consumptionClusters.Intersection(existClusters), nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue