add object watcher to manage operations for object dispatched to member clusters (#104)
Signed-off-by: lihanbo <lihanbo2@huawei.com>
This commit is contained in:
parent
b3b08d6d70
commit
143f09af1b
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/controllers/status"
|
||||
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
||||
)
|
||||
|
||||
// aggregatedScheme aggregates all Kubernetes and extended schemes used by controllers.
|
||||
|
@ -92,6 +93,8 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
|
|||
karmadaClient := karmadaclientset.NewForConfigOrDie(resetConfig)
|
||||
kubeClientSet := kubernetes.NewForConfigOrDie(resetConfig)
|
||||
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), kubeClientSet, mgr.GetRESTMapper())
|
||||
|
||||
MemberClusterController := &membercluster.Controller{
|
||||
Client: mgr.GetClient(),
|
||||
KubeClientSet: kubeClientSet,
|
||||
|
@ -137,6 +140,7 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
|
|||
KubeClientSet: kubeClientSet,
|
||||
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
ObjectWatcher: objectWatcher,
|
||||
}
|
||||
if err := executionController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup execution controller: %v", err)
|
||||
|
@ -151,6 +155,7 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
|
|||
InformerManager: informermanager.NewMultiClusterInformerManager(),
|
||||
StopChan: stopChan,
|
||||
WorkerNumber: 1,
|
||||
ObjectWatcher: objectWatcher,
|
||||
}
|
||||
workStatusController.RunWorkQueue()
|
||||
if err := workStatusController.SetupWithManager(mgr); err != nil {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
@ -21,6 +20,7 @@ import (
|
|||
propagationstrategy "github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
||||
|
@ -35,6 +35,7 @@ type Controller struct {
|
|||
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
|
||||
EventRecorder record.EventRecorder
|
||||
RESTMapper meta.RESTMapper
|
||||
ObjectWatcher objectwatcher.ObjectWatcher
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
|
@ -119,11 +120,6 @@ func (c *Controller) tryDeleteWorkload(propagationWork *propagationstrategy.Prop
|
|||
return nil
|
||||
}
|
||||
|
||||
memberClusterDynamicClient, err := util.NewClusterDynamicClientSet(memberCluster, c.KubeClientSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, manifest := range propagationWork.Spec.Workload.Manifests {
|
||||
workload := &unstructured.Unstructured{}
|
||||
err := workload.UnmarshalJSON(manifest.Raw)
|
||||
|
@ -132,7 +128,7 @@ func (c *Controller) tryDeleteWorkload(propagationWork *propagationstrategy.Prop
|
|||
return err
|
||||
}
|
||||
|
||||
err = c.deleteResource(memberClusterDynamicClient, workload)
|
||||
err = c.ObjectWatcher.Delete(memberClusterName, workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete resource in the given member cluster %v, err is %v", memberCluster.Name, err)
|
||||
return err
|
||||
|
@ -183,16 +179,31 @@ func (c *Controller) syncToMemberClusters(memberCluster *v1alpha1.MemberCluster,
|
|||
klog.Errorf("failed to unmarshal workload, error is: %v", err)
|
||||
return err
|
||||
}
|
||||
c.setOwnerLabel(workload, propagationWork)
|
||||
|
||||
util.MergeLabel(workload, util.OwnerLabel, names.GenerateOwnerLabelValue(propagationWork.GetNamespace(), propagationWork.GetName()))
|
||||
|
||||
applied := c.isResourceApplied(&propagationWork.Status)
|
||||
if applied {
|
||||
err = c.updateResource(memberClusterDynamicClient, workload)
|
||||
// todo: get memberClusterObj from cache
|
||||
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
memberClusterObj, err := memberClusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Get(context.TODO(), workload.GetName(), v1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.ObjectWatcher.Update(memberCluster.Name, memberClusterObj, workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", memberCluster.Name, err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = c.createResource(memberClusterDynamicClient, workload)
|
||||
err = c.ObjectWatcher.Create(memberCluster.Name, workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create resource in the given member cluster %s, err is %v", memberCluster.Name, err)
|
||||
return err
|
||||
|
@ -208,70 +219,6 @@ func (c *Controller) syncToMemberClusters(memberCluster *v1alpha1.MemberCluster,
|
|||
return nil
|
||||
}
|
||||
|
||||
// setOwnerLabel adds ownerLabel for workload that will be applied to member cluster.
|
||||
func (c *Controller) setOwnerLabel(workload *unstructured.Unstructured, propagationWork *propagationstrategy.PropagationWork) {
|
||||
workloadLabel := workload.GetLabels()
|
||||
if workloadLabel == nil {
|
||||
workloadLabel = make(map[string]string, 1)
|
||||
}
|
||||
workloadLabel[util.OwnerLabel] = names.GenerateOwnerLabelValue(propagationWork.GetNamespace(), propagationWork.GetName())
|
||||
workload.SetLabels(workloadLabel)
|
||||
}
|
||||
|
||||
// deleteResource delete resource in member cluster
|
||||
func (c *Controller) deleteResource(memberClusterDynamicClient *util.DynamicClusterClient, workload *unstructured.Unstructured) error {
|
||||
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = memberClusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Delete(context.TODO(), workload.GetName(), v1.DeleteOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete resource %v, err is %v ", workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createResource create resource in member cluster
|
||||
func (c *Controller) createResource(memberClusterDynamicClient *util.DynamicClusterClient, workload *unstructured.Unstructured) error {
|
||||
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = memberClusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Create(context.TODO(), workload, v1.CreateOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("Failed to create resource %v, err is %v ", workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateResource update resource in member cluster
|
||||
func (c *Controller) updateResource(memberClusterDynamicClient *util.DynamicClusterClient, workload *unstructured.Unstructured) error {
|
||||
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = memberClusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Update(context.TODO(), workload, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update resource %v, err is %v ", workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFinalizer remove finalizer from the given propagationWork
|
||||
func (c *Controller) removeFinalizer(propagationWork *propagationstrategy.PropagationWork) (controllerruntime.Result, error) {
|
||||
if !controllerutil.ContainsFinalizer(propagationWork, util.ExecutionControllerFinalizer) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
||||
|
@ -41,6 +42,7 @@ type PropagationWorkStatusController struct {
|
|||
StopChan <-chan struct{}
|
||||
WorkerNumber int // WorkerNumber is the number of worker goroutines
|
||||
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
|
||||
ObjectWatcher objectwatcher.ObjectWatcher
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
|
@ -128,10 +130,31 @@ func (c *PropagationWorkStatusController) syncPropagationWorkStatus(key string)
|
|||
return err
|
||||
}
|
||||
|
||||
// TODO: consult with version manager if current status needs update.
|
||||
// consult with version manager if current status needs update.
|
||||
desireObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
util.MergeLabel(desireObj, util.OwnerLabel, names.GenerateOwnerLabelValue(workObject.GetNamespace(), workObject.GetName()))
|
||||
|
||||
clusterName, err := names.GetMemberClusterName(ownerNamespace)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get member cluster name: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// compare version to determine if need to update resource
|
||||
needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desireObj, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if needUpdate {
|
||||
return c.ObjectWatcher.Update(clusterName, desireObj, obj)
|
||||
}
|
||||
|
||||
klog.Infof("reflecting %s(%s/%s) status of to PropagationWork(%s/%s)", obj.GetKind(), obj.GetNamespace(), obj.GetName(), ownerNamespace, ownerName)
|
||||
|
||||
return c.reflectStatus(workObject, obj)
|
||||
}
|
||||
|
||||
|
@ -230,6 +253,24 @@ func (c *PropagationWorkStatusController) getManifestIndex(manifests []v1alpha1.
|
|||
return -1, fmt.Errorf("no such manifest exist")
|
||||
}
|
||||
|
||||
func (c *PropagationWorkStatusController) getRawManifest(manifests []v1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
for _, rawManifest := range manifests {
|
||||
manifest := &unstructured.Unstructured{}
|
||||
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() &&
|
||||
manifest.GetKind() == clusterObj.GetKind() &&
|
||||
manifest.GetNamespace() == clusterObj.GetNamespace() &&
|
||||
manifest.GetName() == clusterObj.GetName() {
|
||||
return manifest, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no such manifest exist")
|
||||
}
|
||||
|
||||
// getObjectFromCache gets full object information from cache by key in worker queue.
|
||||
func (c *PropagationWorkStatusController) getObjectFromCache(key string) (*unstructured.Unstructured, error) {
|
||||
clusterWorkload, err := util.SplitMetaKey(key)
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
||||
// GetLabelValue retrieves the value via 'labelKey' if exist, otherwise returns an empty string.
|
||||
func GetLabelValue(labels map[string]string, labelKey string) string {
|
||||
if labels == nil {
|
||||
|
@ -8,3 +12,13 @@ func GetLabelValue(labels map[string]string, labelKey string) string {
|
|||
|
||||
return labels[labelKey]
|
||||
}
|
||||
|
||||
// MergeLabel adds label for the given object.
|
||||
func MergeLabel(obj *unstructured.Unstructured, labelKey string, labelValue string) {
|
||||
workloadLabel := obj.GetLabels()
|
||||
if workloadLabel == nil {
|
||||
workloadLabel = make(map[string]string, 1)
|
||||
}
|
||||
workloadLabel[labelKey] = labelValue
|
||||
obj.SetLabels(workloadLabel)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
package objectwatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
||||
const (
|
||||
generationPrefix = "gen:"
|
||||
resourceVersionPrefix = "rv:"
|
||||
)
|
||||
|
||||
// ObjectWatcher manages operations for object dispatched to member clusters.
|
||||
type ObjectWatcher interface {
|
||||
Create(clusterName string, desireObj *unstructured.Unstructured) error
|
||||
Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error
|
||||
Delete(clusterName string, desireObj *unstructured.Unstructured) error
|
||||
NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
|
||||
}
|
||||
|
||||
type objectWatcherImpl struct {
|
||||
client.Client
|
||||
KubeClientSet kubernetes.Interface
|
||||
VersionRecord map[string]map[string]string
|
||||
RESTMapper meta.RESTMapper
|
||||
Lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewObjectWatcher returns a instance of ObjectWatcher
|
||||
func NewObjectWatcher(client client.Client, kubeClientSet kubernetes.Interface, restMapper meta.RESTMapper) ObjectWatcher {
|
||||
return &objectWatcherImpl{
|
||||
Client: client,
|
||||
KubeClientSet: kubeClientSet,
|
||||
VersionRecord: make(map[string]map[string]string),
|
||||
RESTMapper: restMapper,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.Unstructured) error {
|
||||
klog.Infof("Start to create resource %v/%v", desireObj.GetNamespace(), desireObj.GetName())
|
||||
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
|
||||
return err
|
||||
}
|
||||
|
||||
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, v1.CreateOptions{})
|
||||
if err != nil {
|
||||
if apierrors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("Failed to create resource %v, err is %v ", desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// record version
|
||||
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
|
||||
klog.Infof("Start to update resource %v/%v", desireObj.GetNamespace(), desireObj.GetName())
|
||||
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
|
||||
return err
|
||||
}
|
||||
|
||||
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = RetainClusterFields(desireObj, clusterObj)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to retain fields: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update resource %v/%v, err is %v ", desireObj.GetNamespace(), desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// record version
|
||||
o.recordVersion(resource, clusterName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.Unstructured) error {
|
||||
klog.Infof("Start to delete resource %v/%v", desireObj.GetNamespace(), desireObj.GetName())
|
||||
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
|
||||
return err
|
||||
}
|
||||
|
||||
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Delete(context.TODO(), desireObj.GetName(), v1.DeleteOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete resource %v, err is %v ", desireObj.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
objectKey := o.genObjectKey(desireObj)
|
||||
o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) genObjectKey(obj *unstructured.Unstructured) string {
|
||||
return obj.GroupVersionKind().String() + "/" + obj.GetNamespace() + "/" + obj.GetName()
|
||||
}
|
||||
|
||||
// recordVersion will add or update resource version records
|
||||
func (o *objectWatcherImpl) recordVersion(clusterObj *unstructured.Unstructured, clusterName string) {
|
||||
objVersion := objectVersion(clusterObj)
|
||||
objectKey := o.genObjectKey(clusterObj)
|
||||
if o.isClusterVersionRecordExist(clusterName) {
|
||||
o.updateVersionRecord(clusterName, objectKey, objVersion)
|
||||
} else {
|
||||
o.addVersionRecord(clusterName, objectKey, objVersion)
|
||||
}
|
||||
}
|
||||
|
||||
// isClusterVersionRecordExist checks if the version record map of given member cluster exist
|
||||
func (o *objectWatcherImpl) isClusterVersionRecordExist(clusterName string) bool {
|
||||
o.Lock.RLock()
|
||||
defer o.Lock.RUnlock()
|
||||
|
||||
_, exist := o.VersionRecord[clusterName]
|
||||
|
||||
return exist
|
||||
}
|
||||
|
||||
// getVersionRecord will return the recorded version of given resource(if exist)
|
||||
func (o *objectWatcherImpl) getVersionRecord(clusterName, resourceName string) (string, bool) {
|
||||
o.Lock.RLock()
|
||||
defer o.Lock.RUnlock()
|
||||
|
||||
version, exist := o.VersionRecord[clusterName][resourceName]
|
||||
return version, exist
|
||||
}
|
||||
|
||||
// addVersionRecord will add new version record of given resource
|
||||
func (o *objectWatcherImpl) addVersionRecord(clusterName, resourceName, version string) {
|
||||
o.Lock.Lock()
|
||||
defer o.Lock.Unlock()
|
||||
o.VersionRecord[clusterName] = map[string]string{resourceName: version}
|
||||
}
|
||||
|
||||
// updateVersionRecord will update the recorded version of given resource
|
||||
func (o *objectWatcherImpl) updateVersionRecord(clusterName, resourceName, version string) {
|
||||
o.Lock.Lock()
|
||||
defer o.Lock.Unlock()
|
||||
o.VersionRecord[clusterName][resourceName] = version
|
||||
}
|
||||
|
||||
// deleteVersionRecord will delete the recorded version of given resource
|
||||
func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string) {
|
||||
o.Lock.Lock()
|
||||
defer o.Lock.Unlock()
|
||||
delete(o.VersionRecord[clusterName], resourceName)
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) {
|
||||
// get resource version
|
||||
version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName())
|
||||
if !exist {
|
||||
klog.Errorf("Failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName())
|
||||
return false, fmt.Errorf("failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName())
|
||||
}
|
||||
|
||||
return objectNeedsUpdate(desiredObj, clusterObj, version), nil
|
||||
}
|
||||
|
||||
/*
|
||||
This code is lifted from the kubefed codebase. It's a list of functions to determines whether the provided cluster
|
||||
object needs to be updated according to the desired object and the recorded version.
|
||||
For reference: https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L30-L59
|
||||
*/
|
||||
|
||||
// objectVersion retrieves the field type-prefixed value used for
|
||||
// determining currency of the given cluster object.
|
||||
func objectVersion(clusterObj *unstructured.Unstructured) string {
|
||||
generation := clusterObj.GetGeneration()
|
||||
if generation != 0 {
|
||||
return fmt.Sprintf("%s%d", generationPrefix, generation)
|
||||
}
|
||||
return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion())
|
||||
}
|
||||
|
||||
// objectNeedsUpdate determines whether the 2 objects provided cluster
|
||||
// object needs to be updated according to the desired object and the
|
||||
// recorded version.
|
||||
func objectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool {
|
||||
targetVersion := objectVersion(clusterObj)
|
||||
|
||||
if recordedVersion != targetVersion {
|
||||
return true
|
||||
}
|
||||
|
||||
// If versions match and the version is sourced from the
|
||||
// generation field, a further check of metadata equivalency is
|
||||
// required.
|
||||
return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(desiredObj, clusterObj)
|
||||
}
|
||||
|
||||
// objectMetaObjEquivalent checks if cluster-independent, user provided data in two given ObjectMeta are equal. If in
|
||||
// the future the ObjectMeta structure is expanded then any field that is not populated
|
||||
// by the api server should be included here.
|
||||
func objectMetaObjEquivalent(a, b metav1.Object) bool {
|
||||
if a.GetName() != b.GetName() {
|
||||
return false
|
||||
}
|
||||
if a.GetNamespace() != b.GetNamespace() {
|
||||
return false
|
||||
}
|
||||
aLabels := a.GetLabels()
|
||||
bLabels := b.GetLabels()
|
||||
if !reflect.DeepEqual(aLabels, bLabels) && (len(aLabels) != 0 || len(bLabels) != 0) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package objectwatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
||||
/*
|
||||
This code is directly lifted from the kubefed codebase. It's a list of functions to update the desired object with values retained
|
||||
from the cluster object.
|
||||
For reference: https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/sync/dispatch/retain.go#L27-L133
|
||||
*/
|
||||
|
||||
const (
|
||||
// ServiceKind indicates the target resource is a service
|
||||
ServiceKind = "Service"
|
||||
// ServiceAccountKind indicates the target resource is a serviceaccount
|
||||
ServiceAccountKind = "ServiceAccount"
|
||||
// SecretsField indicates the 'secrets' field of a service account
|
||||
SecretsField = "secrets"
|
||||
)
|
||||
|
||||
// RetainClusterFields updates the desired object with values retained
|
||||
// from the cluster object.
|
||||
func RetainClusterFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
targetKind := desiredObj.GetKind()
|
||||
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
|
||||
desiredObj.SetResourceVersion(clusterObj.GetResourceVersion())
|
||||
|
||||
// Retain finalizers and annotations since they will typically be set by
|
||||
// controllers in a member cluster. It is still possible to set the fields
|
||||
// via overrides.
|
||||
desiredObj.SetFinalizers(clusterObj.GetFinalizers())
|
||||
desiredObj.SetAnnotations(clusterObj.GetAnnotations())
|
||||
|
||||
if targetKind == ServiceKind {
|
||||
return retainServiceFields(desiredObj, clusterObj)
|
||||
}
|
||||
if targetKind == ServiceAccountKind {
|
||||
return retainServiceAccountFields(desiredObj, clusterObj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
|
||||
|
||||
// Retain clusterip
|
||||
clusterIP, ok, err := unstructured.NestedString(clusterObj.Object, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving clusterIP from cluster service: %w", err)
|
||||
}
|
||||
// !ok could indicate that a cluster ip was not assigned
|
||||
if ok && clusterIP != "" {
|
||||
err := unstructured.SetNestedField(desiredObj.Object, clusterIP, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting clusterIP for service: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Retain nodeports
|
||||
clusterPorts, ok, err := unstructured.NestedSlice(clusterObj.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from cluster service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
var desiredPorts []interface{}
|
||||
desiredPorts, ok, err = unstructured.NestedSlice(desiredObj.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
desiredPorts = []interface{}{}
|
||||
}
|
||||
for desiredIndex := range desiredPorts {
|
||||
for clusterIndex := range clusterPorts {
|
||||
fPort := desiredPorts[desiredIndex].(map[string]interface{})
|
||||
cPort := clusterPorts[clusterIndex].(map[string]interface{})
|
||||
if !(fPort["name"] == cPort["name"] && fPort["protocol"] == cPort["protocol"] && fPort["port"] == cPort["port"]) {
|
||||
continue
|
||||
}
|
||||
nodePort, ok := cPort["nodePort"]
|
||||
if ok {
|
||||
fPort["nodePort"] = nodePort
|
||||
}
|
||||
}
|
||||
}
|
||||
err = unstructured.SetNestedSlice(desiredObj.Object, desiredPorts, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting ports for service: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retainServiceAccountFields retains the 'secrets' field of a service account
|
||||
// if the desired representation does not include a value for the field. This
|
||||
// ensures that the sync controller doesn't continually clear a generated
|
||||
// secret from a service account, prompting continual regeneration by the
|
||||
// service account controller in the member cluster.
|
||||
func retainServiceAccountFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
// Check whether the secrets field is populated in the desired object.
|
||||
desiredSecrets, ok, err := unstructured.NestedSlice(desiredObj.Object, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving secrets from desired service account: %w", err)
|
||||
}
|
||||
if ok && len(desiredSecrets) > 0 {
|
||||
// Field is populated, so an update to the target resource does not
|
||||
// risk triggering a race with the service account controller.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrieve the secrets from the cluster object and retain them.
|
||||
secrets, ok, err := unstructured.NestedSlice(clusterObj.Object, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving secrets from service account: %w", err)
|
||||
}
|
||||
if ok && len(secrets) > 0 {
|
||||
err := unstructured.SetNestedField(desiredObj.Object, secrets, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting secrets for service account: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue