Merge pull request #2880 from helen-frank/feature/buildWorksConcurrency
namespace_sync_controller: buildWorks parallel sync namespace
This commit is contained in:
commit
60a9a649ed
|
@ -2,12 +2,14 @@ package namespace
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -107,47 +109,59 @@ func (c *Controller) buildWorks(namespace *corev1.Namespace, clusters []clusterv
|
|||
return err
|
||||
}
|
||||
|
||||
for _, cluster := range clusters {
|
||||
clonedNamespaced := namespaceObj.DeepCopy()
|
||||
errChan := make(chan error, len(clusters))
|
||||
for i := range clusters {
|
||||
go func(cluster *clusterv1alpha1.Cluster, ch chan<- error) {
|
||||
clonedNamespaced := namespaceObj.DeepCopy()
|
||||
|
||||
// namespace only care about ClusterOverridePolicy
|
||||
cops, _, err := c.OverrideManager.ApplyOverridePolicies(clonedNamespaced, cluster.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to apply overrides for %s/%s/%s, err is: %v", clonedNamespaced.GetKind(), clonedNamespaced.GetNamespace(), clonedNamespaced.GetName(), err)
|
||||
return err
|
||||
}
|
||||
// namespace only care about ClusterOverridePolicy
|
||||
cops, _, err := c.OverrideManager.ApplyOverridePolicies(clonedNamespaced, cluster.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to apply overrides for %s/%s/%s, err is: %v", clonedNamespaced.GetKind(), clonedNamespaced.GetNamespace(), clonedNamespaced.GetName(), err)
|
||||
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
|
||||
}
|
||||
|
||||
annotations, err := binding.RecordAppliedOverrides(cops, nil, nil)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to record appliedOverrides, Error: %v", err)
|
||||
return err
|
||||
}
|
||||
annotations, err := binding.RecordAppliedOverrides(cops, nil, nil)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to record appliedOverrides, Error: %v", err)
|
||||
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
|
||||
}
|
||||
|
||||
workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to generate execution space name for member cluster %s, err is %v", cluster.Name, err)
|
||||
return err
|
||||
}
|
||||
workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to generate execution space name for member cluster %s, err is %v", cluster.Name, err)
|
||||
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
|
||||
}
|
||||
|
||||
workName := names.GenerateWorkName(namespaceObj.GetKind(), namespaceObj.GetName(), namespaceObj.GetNamespace())
|
||||
objectMeta := metav1.ObjectMeta{
|
||||
Name: workName,
|
||||
Namespace: workNamespace,
|
||||
Finalizers: []string{util.ExecutionControllerFinalizer},
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(namespace, namespace.GroupVersionKind()),
|
||||
},
|
||||
Annotations: annotations,
|
||||
}
|
||||
workName := names.GenerateWorkName(namespaceObj.GetKind(), namespaceObj.GetName(), namespaceObj.GetNamespace())
|
||||
objectMeta := metav1.ObjectMeta{
|
||||
Name: workName,
|
||||
Namespace: workNamespace,
|
||||
Finalizers: []string{util.ExecutionControllerFinalizer},
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(namespace, namespace.GroupVersionKind()),
|
||||
},
|
||||
Annotations: annotations,
|
||||
}
|
||||
|
||||
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNamespaceLabel, workNamespace)
|
||||
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNameLabel, workName)
|
||||
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNamespaceLabel, workNamespace)
|
||||
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNameLabel, workName)
|
||||
|
||||
if err = helper.CreateOrUpdateWork(c.Client, objectMeta, clonedNamespaced); err != nil {
|
||||
return err
|
||||
if err = helper.CreateOrUpdateWork(c.Client, objectMeta, clonedNamespaced); err != nil {
|
||||
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
|
||||
}
|
||||
ch <- nil
|
||||
}(&clusters[i], errChan)
|
||||
}
|
||||
|
||||
errs := []error{}
|
||||
for range clusters {
|
||||
if err := <-errChan; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// SetupWithManager creates a controller and register to controller manager.
|
||||
|
|
Loading…
Reference in New Issue