From 9e7fde6ab9728ca98706d02257c07ae549c35d0f Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Fri, 12 Nov 2021 09:53:34 +0800 Subject: [PATCH] Setup custom resource explorer Signed-off-by: RainbowMango --- .../app/controllermanager.go | 8 ++ pkg/crdexplorer/crdexplorer.go | 12 ++- .../customizedexplorer/customized.go | 6 ++ pkg/crdexplorer/defaultexplorer/default.go | 17 +++- pkg/crdexplorer/defaultexplorer/replica.go | 53 +++++++++-- pkg/detector/detector.go | 87 ++++++------------- pkg/util/helper/binding.go | 16 ++++ 7 files changed, 125 insertions(+), 74 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 800e4ddd2..f7695dda8 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -28,6 +28,7 @@ import ( "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/namespace" "github.com/karmada-io/karmada/pkg/controllers/status" + "github.com/karmada-io/karmada/pkg/crdexplorer" "github.com/karmada-io/karmada/pkg/detector" "github.com/karmada-io/karmada/pkg/karmadactl" "github.com/karmada-io/karmada/pkg/util" @@ -123,6 +124,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan) + + crdExplorer := crdexplorer.NewCustomResourceExplorer("", controlPlaneInformerManager) + if err := mgr.Add(crdExplorer); err != nil { + klog.Fatalf("Failed to setup custom resource explorer: %v", err) + } + resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, Client: mgr.GetClient(), @@ -131,6 +138,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop DynamicClient: dynamicClientSet, SkippedResourceConfig: skippedResourceConfig, SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + ResourceExplorer: crdExplorer, } if err := mgr.Add(resourceDetector); err != nil { klog.Fatalf("Failed to setup resource detector: %v", err) diff --git a/pkg/crdexplorer/crdexplorer.go b/pkg/crdexplorer/crdexplorer.go index 96c78ff1c..e69e2810a 100644 --- a/pkg/crdexplorer/crdexplorer.go +++ b/pkg/crdexplorer/crdexplorer.go @@ -4,6 +4,7 @@ import ( "context" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" @@ -17,6 +18,8 @@ import ( type CustomResourceExplorer interface { // Start starts running the component and will never stop running until the context is closed or an error occurs. Start(ctx context.Context) (err error) + // HookEnabled tells if any hook exist for specific resource type and operation type. + HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool // GetReplicas returns the desired replicas of the object as well as the requirements of each replica. GetReplicas(object runtime.Object) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error) // GetHealthy tells if the object in healthy state. @@ -25,8 +28,8 @@ type CustomResourceExplorer interface { // other common method } -// NewCustomResourceExplore return a new CustomResourceExplorer object. -func NewCustomResourceExplore(kubeconfig string, informer informermanager.SingleClusterInformerManager) CustomResourceExplorer { +// NewCustomResourceExplorer builds a new CustomResourceExplorer object. +func NewCustomResourceExplorer(kubeconfig string, informer informermanager.SingleClusterInformerManager) CustomResourceExplorer { return &customResourceExplorerImpl{ kubeconfig: kubeconfig, informer: informer, @@ -58,6 +61,11 @@ func (i *customResourceExplorerImpl) Start(ctx context.Context) (err error) { return nil } +// HookEnabled tells if any hook exist for specific resource type and operation type. +func (i *customResourceExplorerImpl) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool { + return i.customizedExplorer.HookEnabled(kind, operationType) || i.defaultExplorer.HookEnabled(kind, operationType) +} + // GetReplicas returns the desired replicas of the object as well as the requirements of each replica. func (i *customResourceExplorerImpl) GetReplicas(object runtime.Object) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error) { var hookMatched bool diff --git a/pkg/crdexplorer/customizedexplorer/customized.go b/pkg/crdexplorer/customizedexplorer/customized.go index 8d71aad6c..183087a8a 100644 --- a/pkg/crdexplorer/customizedexplorer/customized.go +++ b/pkg/crdexplorer/customizedexplorer/customized.go @@ -41,6 +41,12 @@ func NewCustomizedExplorer(kubeconfig string, informer informermanager.SingleClu }, nil } +// HookEnabled tells if any hook exist for specific resource type and operation type. +func (e *CustomizedExplorer) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool { + // TODO(RainbowMango): Check if any hook configured + return false +} + // GetReplicas returns the desired replicas of the object as well as the requirements of each replica. // return matched value to indicate whether there is a matching hook. func (e *CustomizedExplorer) GetReplicas(ctx context.Context, operation configv1alpha1.OperationType, diff --git a/pkg/crdexplorer/defaultexplorer/default.go b/pkg/crdexplorer/defaultexplorer/default.go index 6608a9985..d767ae00c 100644 --- a/pkg/crdexplorer/defaultexplorer/default.go +++ b/pkg/crdexplorer/defaultexplorer/default.go @@ -15,7 +15,7 @@ import ( // DefaultExplorer contains all default operation explorer factory // for exploring common resource. type DefaultExplorer struct { - replicaHandlers map[schema.GroupVersionKind]replicaFactory + replicaHandlers map[schema.GroupVersionKind]replicaExplorer packingHandlers map[schema.GroupVersionKind]packingFactory healthyHandlers map[schema.GroupVersionKind]healthyFactory } @@ -29,10 +29,21 @@ func NewDefaultExplorer() *DefaultExplorer { } } +// HookEnabled tells if any hook exist for specific resource type and operation type. +func (e *DefaultExplorer) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool { + switch operationType { + case configv1alpha1.ExploreReplica: + if _, exist := e.replicaHandlers[kind]; exist { + return true + } + // TODO(RainbowMango): more cases should be added here + } + return false +} + // GetReplicas returns the desired replicas of the object as well as the requirements of each replica. func (e *DefaultExplorer) GetReplicas(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) { - // judge object type, and then get correct kind. - _, exist := e.replicaHandlers[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] + _, exist := e.replicaHandlers[object.GetObjectKind().GroupVersionKind()] if !exist { return 0, &workv1alpha2.ReplicaRequirements{}, fmt.Errorf("defalut explorer for operation %s not found", configv1alpha1.ExploreReplica) } diff --git a/pkg/crdexplorer/defaultexplorer/replica.go b/pkg/crdexplorer/defaultexplorer/replica.go index 519f70751..62e361c86 100644 --- a/pkg/crdexplorer/defaultexplorer/replica.go +++ b/pkg/crdexplorer/defaultexplorer/replica.go @@ -1,30 +1,69 @@ package defaultexplorer import ( + "fmt" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" ) -// replicaFactory return default replica factory that can be used to obtain replica -// and requirements by each replica from the input object. -type replicaFactory func(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) +// replicaExplorer is the function that used to parse replica and requirements from object. +type replicaExplorer func(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) -func getAllDefaultReplicaExplorer() map[schema.GroupVersionKind]replicaFactory { - explorers := make(map[schema.GroupVersionKind]replicaFactory) +func getAllDefaultReplicaExplorer() map[schema.GroupVersionKind]replicaExplorer { + explorers := make(map[schema.GroupVersionKind]replicaExplorer) explorers[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = deployReplicaExplorer explorers[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = jobReplicaExplorer return explorers } func deployReplicaExplorer(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) { - return 0, &workv1alpha2.ReplicaRequirements{}, nil + unstructuredObj, ok := object.(*unstructured.Unstructured) + if !ok { + return 0, nil, fmt.Errorf("unexpected object type, requires unstructured") + } + + deploy, err := helper.ConvertToDeployment(unstructuredObj) + if err != nil { + klog.Errorf("Failed to convert object(%s), err", object.GetObjectKind().GroupVersionKind().String(), err) + return 0, nil, err + } + + var replica int32 + if deploy.Spec.Replicas != nil { + replica = *deploy.Spec.Replicas + } + requirement := helper.GenerateReplicaRequirements(&deploy.Spec.Template) + + return replica, requirement, nil } func jobReplicaExplorer(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) { - return 0, &workv1alpha2.ReplicaRequirements{}, nil + unstructuredObj, ok := object.(*unstructured.Unstructured) + if !ok { + return 0, nil, fmt.Errorf("unexpected object type, requires unstructured") + } + + job, err := helper.ConvertToJob(unstructuredObj) + if err != nil { + klog.Errorf("Failed to convert object(%s), err", object.GetObjectKind().GroupVersionKind().String(), err) + return 0, nil, err + } + + var replica int32 + // parallelism might never be nil as the kube-apiserver will set it to 1 by default if not specified. + if job.Spec.Parallelism != nil { + replica = *job.Spec.Parallelism + } + requirement := helper.GenerateReplicaRequirements(&job.Spec.Template) + + return replica, requirement, nil } diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 5f0600282..3b7cb70c1 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -8,7 +8,6 @@ import ( "sync" "time" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,8 +25,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" + configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/crdexplorer" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" @@ -49,6 +50,9 @@ type ResourceDetector struct { Processor util.AsyncWorker SkippedResourceConfig *util.SkippedResourceConfig SkippedPropagatingNamespaces map[string]struct{} + // ResourceExplorer knows the details of resource structure. + ResourceExplorer crdexplorer.CustomResourceExplorer + // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and // a reconcile function to consume the items in queue. policyReconcileWorker util.AsyncWorker @@ -617,10 +621,6 @@ func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unst // BuildResourceBinding builds a desired ResourceBinding for object. func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ResourceBinding, error) { bindingName := names.GenerateBindingName(object.GetKind(), object.GetName()) - replicaRequirements, replicas, err := d.GetReplicaDeclaration(object) - if err != nil { - return nil, err - } propagationBinding := &workv1alpha2.ResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, @@ -639,21 +639,25 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure Name: object.GetName(), ResourceVersion: object.GetResourceVersion(), }, - ReplicaRequirements: replicaRequirements, - Replicas: replicas, }, } + if d.ResourceExplorer.HookEnabled(object.GroupVersionKind(), configv1alpha1.ExploreReplica) { + replicas, replicaRequirements, err := d.ResourceExplorer.GetReplicas(object) + if err != nil { + klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err) + return nil, err + } + propagationBinding.Spec.Replicas = replicas + propagationBinding.Spec.ReplicaRequirements = replicaRequirements + } + return propagationBinding, nil } // BuildClusterResourceBinding builds a desired ClusterResourceBinding for object. func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ClusterResourceBinding, error) { bindingName := names.GenerateBindingName(object.GetKind(), object.GetName()) - replicaRequirements, replicas, err := d.GetReplicaDeclaration(object) - if err != nil { - return nil, err - } binding := &workv1alpha2.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, @@ -670,63 +674,22 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst Name: object.GetName(), ResourceVersion: object.GetResourceVersion(), }, - ReplicaRequirements: replicaRequirements, - Replicas: replicas, }, } + if d.ResourceExplorer.HookEnabled(object.GroupVersionKind(), configv1alpha1.ExploreReplica) { + replicas, replicaRequirements, err := d.ResourceExplorer.GetReplicas(object) + if err != nil { + klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err) + return nil, err + } + binding.Spec.Replicas = replicas + binding.Spec.ReplicaRequirements = replicaRequirements + } + return binding, nil } -// GetReplicaDeclaration get the replicas and resource requirements of a Deployment object -func (d *ResourceDetector) GetReplicaDeclaration(object *unstructured.Unstructured) (*workv1alpha2.ReplicaRequirements, int32, error) { - switch object.GetKind() { - case util.DeploymentKind: - replicas, ok, err := unstructured.NestedInt64(object.Object, util.SpecField, util.ReplicasField) - if !ok || err != nil { - return nil, 0, err - } - podTemplate, ok, err := unstructured.NestedMap(object.Object, util.SpecField, util.TemplateField) - if !ok || err != nil { - return nil, 0, err - } - replicaRequirements, err := d.getReplicaRequirements(podTemplate) - if err != nil { - return nil, 0, err - } - return replicaRequirements, int32(replicas), nil - case util.JobKind: - replicas, ok, err := unstructured.NestedInt64(object.Object, util.SpecField, util.ParallelismField) - if !ok || err != nil { - return nil, 0, err - } - podTemplate, ok, err := unstructured.NestedMap(object.Object, util.SpecField, util.TemplateField) - if !ok || err != nil { - return nil, 0, err - } - replicaRequirements, err := d.getReplicaRequirements(podTemplate) - if err != nil { - return nil, 0, err - } - return replicaRequirements, int32(replicas), nil - } - return nil, 0, nil -} - -func (d *ResourceDetector) getReplicaRequirements(object map[string]interface{}) (*workv1alpha2.ReplicaRequirements, error) { - var podTemplateSpec *corev1.PodTemplateSpec - err := runtime.DefaultUnstructuredConverter.FromUnstructured(object, &podTemplateSpec) - if err != nil { - return nil, err - } - res := util.EmptyResource().AddPodRequest(&podTemplateSpec.Spec) - replicaRequirements := &workv1alpha2.ReplicaRequirements{ - NodeClaim: helper.GenerateNodeClaimByPodSpec(&podTemplateSpec.Spec), - ResourceRequest: res.ResourceList(), - } - return replicaRequirements, nil -} - // AddWaiting adds object's key to waiting list. func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) { d.waitingLock.Lock() diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index a5bf067b0..1db28a928 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -21,6 +21,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/restmapper" @@ -307,3 +308,18 @@ func GenerateNodeClaimByPodSpec(podSpec *corev1.PodSpec) *workv1alpha2.NodeClaim } return nodeClaim } + +// GenerateReplicaRequirements generates replica requirements for node and resources. +func GenerateReplicaRequirements(podTemplate *corev1.PodTemplateSpec) *workv1alpha2.ReplicaRequirements { + nodeClaim := GenerateNodeClaimByPodSpec(&podTemplate.Spec) + resourceRequest := util.EmptyResource().AddPodRequest(&podTemplate.Spec).ResourceList() + + if nodeClaim != nil || resourceRequest != nil { + return &workv1alpha2.ReplicaRequirements{ + NodeClaim: nodeClaim, + ResourceRequest: resourceRequest, + } + } + + return nil +}