add retaining webhook
Signed-off-by: guoyao <1015105054@qq.com>
This commit is contained in:
parent
8fd235f12e
commit
81f6cb939b
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/dynamic"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/status"
|
||||
"github.com/karmada-io/karmada/pkg/crdexplorer"
|
||||
"github.com/karmada-io/karmada/pkg/karmadactl"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||
|
|
@ -126,7 +128,15 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
klog.Fatalf("Failed to setup cluster status controller: %v", err)
|
||||
}
|
||||
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent)
|
||||
restConfig := mgr.GetConfig()
|
||||
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
||||
controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
|
||||
crdExplorer := crdexplorer.NewCustomResourceExplorer("", controlPlaneInformerManager)
|
||||
if err := mgr.Add(crdExplorer); err != nil {
|
||||
klog.Fatalf("Failed to setup custom resource explorer: %v", err)
|
||||
}
|
||||
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, crdExplorer)
|
||||
executionController := &execution.Controller{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
restConfig := mgr.GetConfig()
|
||||
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
||||
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet)
|
||||
|
||||
overrideManager := overridemanager.New(mgr.GetClient())
|
||||
skippedResourceConfig := util.NewSkippedResourceConfig()
|
||||
if err := skippedResourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
|
||||
|
|
@ -129,6 +129,8 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
klog.Fatalf("Failed to setup custom resource explorer: %v", err)
|
||||
}
|
||||
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, crdExplorer)
|
||||
|
||||
resourceDetector := &detector.ResourceDetector{
|
||||
DiscoveryClientSet: discoverClientSet,
|
||||
Client: mgr.GetClient(),
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
|
|
@ -14,7 +15,7 @@ import (
|
|||
// for exploring common resource.
|
||||
type DefaultExplorer struct {
|
||||
replicaHandlers map[schema.GroupVersionKind]replicaExplorer
|
||||
retentionHandlers map[schema.GroupVersionKind]RetentionExplorer
|
||||
retentionHandlers map[schema.GroupVersionKind]retentionExplorer
|
||||
}
|
||||
|
||||
// NewDefaultExplorer return a new DefaultExplorer.
|
||||
|
|
@ -39,6 +40,8 @@ func (e *DefaultExplorer) HookEnabled(kind schema.GroupVersionKind, operationTyp
|
|||
|
||||
// TODO(RainbowMango): more cases should be added here
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Hook is not enabled: %q in %q is not supported.", kind, operationType)
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +58,7 @@ func (e *DefaultExplorer) GetReplicas(object *unstructured.Unstructured) (int32,
|
|||
func (e *DefaultExplorer) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error) {
|
||||
handler, exist := e.retentionHandlers[desired.GroupVersionKind()]
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("default explorer for operation %s not found", configv1alpha1.ExploreRetaining)
|
||||
return nil, fmt.Errorf("default retain explorer for %q not found", desired.GroupVersionKind())
|
||||
}
|
||||
|
||||
return handler(desired, observed)
|
||||
|
|
|
|||
|
|
@ -1,17 +1,226 @@
|
|||
package defaultexplorer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
)
|
||||
|
||||
// RetentionExplorer is the function that retains values from "Cluster" object.
|
||||
type RetentionExplorer func(desired *unstructured.Unstructured, cluster *unstructured.Unstructured) (retained *unstructured.Unstructured, err error)
|
||||
// retentionExplorer is the function that retains values from "observed" object.
|
||||
type retentionExplorer func(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error)
|
||||
|
||||
func getAllDefaultRetentionExplorer() map[schema.GroupVersionKind]RetentionExplorer {
|
||||
explorers := make(map[schema.GroupVersionKind]RetentionExplorer)
|
||||
|
||||
// TODO(RainbowMango): Migrate retention functions from
|
||||
// https://github.com/karmada-io/karmada/blob/master/pkg/util/objectwatcher/retain.go
|
||||
func getAllDefaultRetentionExplorer() map[schema.GroupVersionKind]retentionExplorer {
|
||||
explorers := make(map[schema.GroupVersionKind]retentionExplorer)
|
||||
explorers[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = retainPodFields
|
||||
explorers[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = retainServiceFields
|
||||
explorers[corev1.SchemeGroupVersion.WithKind(util.ServiceAccountKind)] = retainServiceAccountFields
|
||||
explorers[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = retainPersistentVolumeClaimFields
|
||||
explorers[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = retainJobSelectorFields
|
||||
return explorers
|
||||
}
|
||||
|
||||
/*
|
||||
This code is directly lifted from the kubefed codebase. It's a list of functions to update the desired object with values retained
|
||||
from the cluster object.
|
||||
For reference: https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/sync/dispatch/retain.go#L27-L133
|
||||
*/
|
||||
|
||||
const (
|
||||
// SecretsField indicates the 'secrets' field of a service account
|
||||
SecretsField = "secrets"
|
||||
)
|
||||
|
||||
func retainPodFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
desiredPod, err := helper.ConvertToPod(desired)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert desiredPod from unstructured object: %v", err)
|
||||
}
|
||||
|
||||
clusterPod, err := helper.ConvertToPod(observed)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert clusterPod from unstructured object: %v", err)
|
||||
}
|
||||
|
||||
desiredPod.Spec.NodeName = clusterPod.Spec.NodeName
|
||||
desiredPod.Spec.ServiceAccountName = clusterPod.Spec.ServiceAccountName
|
||||
desiredPod.Spec.Volumes = clusterPod.Spec.Volumes
|
||||
// retain volumeMounts in each container
|
||||
for _, clusterContainer := range clusterPod.Spec.Containers {
|
||||
for desiredIndex, desiredContainer := range desiredPod.Spec.Containers {
|
||||
if desiredContainer.Name == clusterContainer.Name {
|
||||
desiredPod.Spec.Containers[desiredIndex].VolumeMounts = clusterContainer.VolumeMounts
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unCastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(desiredPod)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to transform Pod: %v", err)
|
||||
}
|
||||
|
||||
desired.Object = unCastObj
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
// retainServiceFields updates the desired service object with values retained from the cluster object.
|
||||
func retainServiceFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
// healthCheckNodePort is allocated by APIServer and unchangeable, so it should be retained while updating
|
||||
if err := retainServiceHealthCheckNodePort(desired, observed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
|
||||
if err := retainServiceClusterIP(desired, observed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := retainServiceNodePort(desired, observed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
func retainServiceHealthCheckNodePort(desired, observed *unstructured.Unstructured) error {
|
||||
healthCheckNodePort, ok, err := unstructured.NestedInt64(observed.Object, "spec", "healthCheckNodePort")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving healthCheckNodePort from service: %w", err)
|
||||
}
|
||||
if ok && healthCheckNodePort > 0 {
|
||||
if err = unstructured.SetNestedField(desired.Object, healthCheckNodePort, "spec", "healthCheckNodePort"); err != nil {
|
||||
return fmt.Errorf("error setting healthCheckNodePort for service: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceClusterIP(desired, observed *unstructured.Unstructured) error {
|
||||
clusterIP, ok, err := unstructured.NestedString(observed.Object, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving clusterIP from cluster service: %w", err)
|
||||
}
|
||||
// !ok could indicate that a cluster ip was not assigned
|
||||
if ok && clusterIP != "" {
|
||||
err = unstructured.SetNestedField(desired.Object, clusterIP, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting clusterIP for service: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceNodePort(desired, observed *unstructured.Unstructured) error {
|
||||
clusterPorts, ok, err := unstructured.NestedSlice(observed.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from cluster service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var desiredPorts []interface{}
|
||||
desiredPorts, ok, err = unstructured.NestedSlice(desired.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
desiredPorts = []interface{}{}
|
||||
}
|
||||
for desiredIndex := range desiredPorts {
|
||||
for clusterIndex := range clusterPorts {
|
||||
fPort := desiredPorts[desiredIndex].(map[string]interface{})
|
||||
cPort := clusterPorts[clusterIndex].(map[string]interface{})
|
||||
if !(fPort["name"] == cPort["name"] && fPort["protocol"] == cPort["protocol"] && fPort["port"] == cPort["port"]) {
|
||||
continue
|
||||
}
|
||||
nodePort, ok := cPort["nodePort"]
|
||||
if ok {
|
||||
fPort["nodePort"] = nodePort
|
||||
}
|
||||
}
|
||||
}
|
||||
err = unstructured.SetNestedSlice(desired.Object, desiredPorts, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting ports for service: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retainServiceAccountFields retains the 'secrets' field of a service account
|
||||
// if the desired representation does not include a value for the field. This
|
||||
// ensures that the sync controller doesn't continually clear a generated
|
||||
// secret from a service account, prompting continual regeneration by the
|
||||
// service account controller in the member cluster.
|
||||
func retainServiceAccountFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
// Check whether the secrets field is populated in the desired object.
|
||||
desiredSecrets, ok, err := unstructured.NestedSlice(desired.Object, SecretsField)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving secrets from desired service account: %w", err)
|
||||
}
|
||||
if ok && len(desiredSecrets) > 0 {
|
||||
// Field is populated, so an update to the target resource does not
|
||||
// risk triggering a race with the service account controller.
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
// Retrieve the secrets from the cluster object and retain them.
|
||||
secrets, ok, err := unstructured.NestedSlice(observed.Object, SecretsField)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving secrets from service account: %w", err)
|
||||
}
|
||||
if ok && len(secrets) > 0 {
|
||||
err := unstructured.SetNestedField(desired.Object, secrets, SecretsField)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error setting secrets for service account: %w", err)
|
||||
}
|
||||
}
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
func retainPersistentVolumeClaimFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
// volumeName is allocated by member cluster and unchangeable, so it should be retained while updating
|
||||
volumeName, ok, err := unstructured.NestedString(observed.Object, "spec", "volumeName")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving volumeName from pvc: %w", err)
|
||||
}
|
||||
if ok && len(volumeName) > 0 {
|
||||
if err = unstructured.SetNestedField(desired.Object, volumeName, "spec", "volumeName"); err != nil {
|
||||
return nil, fmt.Errorf("error setting volumeName for pvc: %w", err)
|
||||
}
|
||||
}
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
func retainJobSelectorFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
matchLabels, exist, err := unstructured.NestedStringMap(observed.Object, "spec", "selector", "matchLabels")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exist {
|
||||
err = unstructured.SetNestedStringMap(desired.Object, matchLabels, "spec", "selector", "matchLabels")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
templateLabels, exist, err := unstructured.NestedStringMap(observed.Object, "spec", "template", "metadata", "labels")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exist {
|
||||
err = unstructured.SetNestedStringMap(desired.Object, templateLabels, "spec", "template", "metadata", "labels")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return desired, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,9 @@ import (
|
|||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/crdexplorer"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
|
@ -41,15 +43,17 @@ type objectWatcherImpl struct {
|
|||
KubeClientSet client.Client
|
||||
VersionRecord map[string]map[string]string
|
||||
ClusterClientSetFunc ClientSetFunc
|
||||
resourceExplorer crdexplorer.CustomResourceExplorer
|
||||
}
|
||||
|
||||
// NewObjectWatcher returns an instance of ObjectWatcher
|
||||
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc) ObjectWatcher {
|
||||
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, explorer crdexplorer.CustomResourceExplorer) ObjectWatcher {
|
||||
return &objectWatcherImpl{
|
||||
KubeClientSet: kubeClientSet,
|
||||
VersionRecord: make(map[string]map[string]string),
|
||||
RESTMapper: restMapper,
|
||||
ClusterClientSetFunc: clusterClientSetFunc,
|
||||
resourceExplorer: explorer,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -97,6 +101,25 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U
|
|||
return nil
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
|
||||
desired.SetResourceVersion(observed.GetResourceVersion())
|
||||
|
||||
// Retain finalizers since they will typically be set by
|
||||
// controllers in a member cluster. It is still possible to set the fields
|
||||
// via overrides.
|
||||
desired.SetFinalizers(observed.GetFinalizers())
|
||||
// Merge annotations since they will typically be set by controllers in a member cluster
|
||||
// and be set by user in karmada-controller-plane.
|
||||
util.MergeAnnotations(desired, observed)
|
||||
|
||||
if o.resourceExplorer.HookEnabled(desired, configv1alpha1.ExploreRetaining) {
|
||||
return o.resourceExplorer.Retain(desired, observed)
|
||||
}
|
||||
|
||||
return desired, nil
|
||||
}
|
||||
|
||||
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
|
||||
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
|
||||
if err != nil {
|
||||
|
|
@ -110,7 +133,7 @@ func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *un
|
|||
return err
|
||||
}
|
||||
|
||||
err = RetainClusterFields(desireObj, clusterObj)
|
||||
desireObj, err = o.retainClusterFields(desireObj, clusterObj)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -1,240 +0,0 @@
|
|||
package objectwatcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
)
|
||||
|
||||
/*
|
||||
This code is directly lifted from the kubefed codebase. It's a list of functions to update the desired object with values retained
|
||||
from the cluster object.
|
||||
For reference: https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/sync/dispatch/retain.go#L27-L133
|
||||
*/
|
||||
|
||||
const (
|
||||
// SecretsField indicates the 'secrets' field of a service account
|
||||
SecretsField = "secrets"
|
||||
)
|
||||
|
||||
// RetainClusterFields updates the desired object with values retained
|
||||
// from the cluster object.
|
||||
func RetainClusterFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
targetKind := desiredObj.GetKind()
|
||||
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
|
||||
desiredObj.SetResourceVersion(clusterObj.GetResourceVersion())
|
||||
|
||||
// Retain finalizers since they will typically be set by
|
||||
// controllers in a member cluster. It is still possible to set the fields
|
||||
// via overrides.
|
||||
desiredObj.SetFinalizers(clusterObj.GetFinalizers())
|
||||
// Merge annotations since they will typically be set by controllers in a member cluster
|
||||
// and be set by user in karmada-controller-plane.
|
||||
util.MergeAnnotations(desiredObj, clusterObj)
|
||||
|
||||
switch targetKind {
|
||||
case util.PodKind:
|
||||
return retainPodFields(desiredObj, clusterObj)
|
||||
case util.ServiceKind:
|
||||
return retainServiceFields(desiredObj, clusterObj)
|
||||
case util.ServiceAccountKind:
|
||||
return retainServiceAccountFields(desiredObj, clusterObj)
|
||||
case util.PersistentVolumeClaimKind:
|
||||
return retainPersistentVolumeClaimFields(desiredObj, clusterObj)
|
||||
case util.JobKind:
|
||||
return retainJobSelectorFields(desiredObj, clusterObj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainPodFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
desiredPod, err := helper.ConvertToPod(desiredObj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert desiredPod from unstructured object: %v", err)
|
||||
}
|
||||
|
||||
clusterPod, err := helper.ConvertToPod(clusterObj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert clusterPod from unstructured object: %v", err)
|
||||
}
|
||||
|
||||
desiredPod.Spec.NodeName = clusterPod.Spec.NodeName
|
||||
desiredPod.Spec.ServiceAccountName = clusterPod.Spec.ServiceAccountName
|
||||
desiredPod.Spec.Volumes = clusterPod.Spec.Volumes
|
||||
// retain volumeMounts in each container
|
||||
for _, clusterContainer := range clusterPod.Spec.Containers {
|
||||
for desiredIndex, desiredContainer := range desiredPod.Spec.Containers {
|
||||
if desiredContainer.Name == clusterContainer.Name {
|
||||
desiredPod.Spec.Containers[desiredIndex].VolumeMounts = clusterContainer.VolumeMounts
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unCastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(desiredPod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to transform Pod: %v", err)
|
||||
}
|
||||
|
||||
desiredObj.Object = unCastObj
|
||||
return nil
|
||||
}
|
||||
|
||||
// retainServiceFields updates the desired service object with values retained from the cluster object.
|
||||
func retainServiceFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
// healthCheckNodePort is allocated by APIServer and unchangeable, so it should be retained while updating
|
||||
if err := retainServiceHealthCheckNodePort(desiredObj, clusterObj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
|
||||
if err := retainServiceClusterIP(desiredObj, clusterObj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := retainServiceNodePort(desiredObj, clusterObj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceHealthCheckNodePort(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
healthCheckNodePort, ok, err := unstructured.NestedInt64(clusterObj.Object, "spec", "healthCheckNodePort")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving healthCheckNodePort from service: %w", err)
|
||||
}
|
||||
if ok && healthCheckNodePort > 0 {
|
||||
if err = unstructured.SetNestedField(desiredObj.Object, healthCheckNodePort, "spec", "healthCheckNodePort"); err != nil {
|
||||
return fmt.Errorf("error setting healthCheckNodePort for service: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceClusterIP(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
clusterIP, ok, err := unstructured.NestedString(clusterObj.Object, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving clusterIP from cluster service: %w", err)
|
||||
}
|
||||
// !ok could indicate that a cluster ip was not assigned
|
||||
if ok && clusterIP != "" {
|
||||
err = unstructured.SetNestedField(desiredObj.Object, clusterIP, "spec", "clusterIP")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting clusterIP for service: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainServiceNodePort(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
clusterPorts, ok, err := unstructured.NestedSlice(clusterObj.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from cluster service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var desiredPorts []interface{}
|
||||
desiredPorts, ok, err = unstructured.NestedSlice(desiredObj.Object, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving ports from service: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
desiredPorts = []interface{}{}
|
||||
}
|
||||
for desiredIndex := range desiredPorts {
|
||||
for clusterIndex := range clusterPorts {
|
||||
fPort := desiredPorts[desiredIndex].(map[string]interface{})
|
||||
cPort := clusterPorts[clusterIndex].(map[string]interface{})
|
||||
if !(fPort["name"] == cPort["name"] && fPort["protocol"] == cPort["protocol"] && fPort["port"] == cPort["port"]) {
|
||||
continue
|
||||
}
|
||||
nodePort, ok := cPort["nodePort"]
|
||||
if ok {
|
||||
fPort["nodePort"] = nodePort
|
||||
}
|
||||
}
|
||||
}
|
||||
err = unstructured.SetNestedSlice(desiredObj.Object, desiredPorts, "spec", "ports")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting ports for service: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retainServiceAccountFields retains the 'secrets' field of a service account
|
||||
// if the desired representation does not include a value for the field. This
|
||||
// ensures that the sync controller doesn't continually clear a generated
|
||||
// secret from a service account, prompting continual regeneration by the
|
||||
// service account controller in the member cluster.
|
||||
func retainServiceAccountFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
// Check whether the secrets field is populated in the desired object.
|
||||
desiredSecrets, ok, err := unstructured.NestedSlice(desiredObj.Object, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving secrets from desired service account: %w", err)
|
||||
}
|
||||
if ok && len(desiredSecrets) > 0 {
|
||||
// Field is populated, so an update to the target resource does not
|
||||
// risk triggering a race with the service account controller.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrieve the secrets from the cluster object and retain them.
|
||||
secrets, ok, err := unstructured.NestedSlice(clusterObj.Object, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving secrets from service account: %w", err)
|
||||
}
|
||||
if ok && len(secrets) > 0 {
|
||||
err := unstructured.SetNestedField(desiredObj.Object, secrets, SecretsField)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting secrets for service account: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainPersistentVolumeClaimFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
// volumeName is allocated by member cluster and unchangeable, so it should be retained while updating
|
||||
volumeName, ok, err := unstructured.NestedString(clusterObj.Object, "spec", "volumeName")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving volumeName from pvc: %w", err)
|
||||
}
|
||||
if ok && len(volumeName) > 0 {
|
||||
if err = unstructured.SetNestedField(desiredObj.Object, volumeName, "spec", "volumeName"); err != nil {
|
||||
return fmt.Errorf("error setting volumeName for pvc: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retainJobSelectorFields(desiredObj, clusterObj *unstructured.Unstructured) error {
|
||||
matchLabels, exist, err := unstructured.NestedStringMap(clusterObj.Object, "spec", "selector", "matchLabels")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exist {
|
||||
err = unstructured.SetNestedStringMap(desiredObj.Object, matchLabels, "spec", "selector", "matchLabels")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
templateLabels, exist, err := unstructured.NestedStringMap(clusterObj.Object, "spec", "template", "metadata", "labels")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exist {
|
||||
err = unstructured.SetNestedStringMap(desiredObj.Object, templateLabels, "spec", "template", "metadata", "labels")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue