Merge pull request #951 from RainbowMango/pr_enable_explorer

Setup custom resource explorer
This commit is contained in:
karmada-bot 2021-11-12 19:26:32 +08:00 committed by GitHub
commit c67874689e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 125 additions and 74 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
"github.com/karmada-io/karmada/pkg/controllers/status"
"github.com/karmada-io/karmada/pkg/crdexplorer"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/karmadactl"
"github.com/karmada-io/karmada/pkg/util"
@ -123,6 +124,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}
controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
crdExplorer := crdexplorer.NewCustomResourceExplorer("", controlPlaneInformerManager)
if err := mgr.Add(crdExplorer); err != nil {
klog.Fatalf("Failed to setup custom resource explorer: %v", err)
}
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
@ -131,6 +138,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
ResourceExplorer: crdExplorer,
}
if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)

View File

@ -4,6 +4,7 @@ import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
@ -17,6 +18,8 @@ import (
type CustomResourceExplorer interface {
// Start starts running the component and will never stop running until the context is closed or an error occurs.
Start(ctx context.Context) (err error)
// HookEnabled tells if any hook exist for specific resource type and operation type.
HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool
// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
GetReplicas(object runtime.Object) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error)
// GetHealthy tells if the object in healthy state.
@ -25,8 +28,8 @@ type CustomResourceExplorer interface {
// other common method
}
// NewCustomResourceExplore return a new CustomResourceExplorer object.
func NewCustomResourceExplore(kubeconfig string, informer informermanager.SingleClusterInformerManager) CustomResourceExplorer {
// NewCustomResourceExplorer builds a new CustomResourceExplorer object.
func NewCustomResourceExplorer(kubeconfig string, informer informermanager.SingleClusterInformerManager) CustomResourceExplorer {
return &customResourceExplorerImpl{
kubeconfig: kubeconfig,
informer: informer,
@ -58,6 +61,11 @@ func (i *customResourceExplorerImpl) Start(ctx context.Context) (err error) {
return nil
}
// HookEnabled tells if any hook exist for specific resource type and operation type.
func (i *customResourceExplorerImpl) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool {
return i.customizedExplorer.HookEnabled(kind, operationType) || i.defaultExplorer.HookEnabled(kind, operationType)
}
// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
func (i *customResourceExplorerImpl) GetReplicas(object runtime.Object) (replica int32, replicaRequires *workv1alpha2.ReplicaRequirements, err error) {
var hookMatched bool

View File

@ -41,6 +41,12 @@ func NewCustomizedExplorer(kubeconfig string, informer informermanager.SingleClu
}, nil
}
// HookEnabled tells if any hook exist for specific resource type and operation type.
func (e *CustomizedExplorer) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool {
// TODO(RainbowMango): Check if any hook configured
return false
}
// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
// return matched value to indicate whether there is a matching hook.
func (e *CustomizedExplorer) GetReplicas(ctx context.Context, operation configv1alpha1.OperationType,

View File

@ -15,7 +15,7 @@ import (
// DefaultExplorer contains all default operation explorer factory
// for exploring common resource.
type DefaultExplorer struct {
replicaHandlers map[schema.GroupVersionKind]replicaFactory
replicaHandlers map[schema.GroupVersionKind]replicaExplorer
packingHandlers map[schema.GroupVersionKind]packingFactory
healthyHandlers map[schema.GroupVersionKind]healthyFactory
}
@ -29,10 +29,21 @@ func NewDefaultExplorer() *DefaultExplorer {
}
}
// HookEnabled tells if any hook exist for specific resource type and operation type.
func (e *DefaultExplorer) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.OperationType) bool {
switch operationType {
case configv1alpha1.ExploreReplica:
if _, exist := e.replicaHandlers[kind]; exist {
return true
}
// TODO(RainbowMango): more cases should be added here
}
return false
}
// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
func (e *DefaultExplorer) GetReplicas(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) {
// judge object type, and then get correct kind.
_, exist := e.replicaHandlers[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)]
_, exist := e.replicaHandlers[object.GetObjectKind().GroupVersionKind()]
if !exist {
return 0, &workv1alpha2.ReplicaRequirements{}, fmt.Errorf("defalut explorer for operation %s not found", configv1alpha1.ExploreReplica)
}

View File

@ -1,30 +1,69 @@
package defaultexplorer
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
// replicaFactory return default replica factory that can be used to obtain replica
// and requirements by each replica from the input object.
type replicaFactory func(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error)
// replicaExplorer is the function that used to parse replica and requirements from object.
type replicaExplorer func(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error)
func getAllDefaultReplicaExplorer() map[schema.GroupVersionKind]replicaFactory {
explorers := make(map[schema.GroupVersionKind]replicaFactory)
func getAllDefaultReplicaExplorer() map[schema.GroupVersionKind]replicaExplorer {
explorers := make(map[schema.GroupVersionKind]replicaExplorer)
explorers[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = deployReplicaExplorer
explorers[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = jobReplicaExplorer
return explorers
}
func deployReplicaExplorer(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) {
return 0, &workv1alpha2.ReplicaRequirements{}, nil
unstructuredObj, ok := object.(*unstructured.Unstructured)
if !ok {
return 0, nil, fmt.Errorf("unexpected object type, requires unstructured")
}
deploy, err := helper.ConvertToDeployment(unstructuredObj)
if err != nil {
klog.Errorf("Failed to convert object(%s), err", object.GetObjectKind().GroupVersionKind().String(), err)
return 0, nil, err
}
var replica int32
if deploy.Spec.Replicas != nil {
replica = *deploy.Spec.Replicas
}
requirement := helper.GenerateReplicaRequirements(&deploy.Spec.Template)
return replica, requirement, nil
}
func jobReplicaExplorer(object runtime.Object) (int32, *workv1alpha2.ReplicaRequirements, error) {
return 0, &workv1alpha2.ReplicaRequirements{}, nil
unstructuredObj, ok := object.(*unstructured.Unstructured)
if !ok {
return 0, nil, fmt.Errorf("unexpected object type, requires unstructured")
}
job, err := helper.ConvertToJob(unstructuredObj)
if err != nil {
klog.Errorf("Failed to convert object(%s), err", object.GetObjectKind().GroupVersionKind().String(), err)
return 0, nil, err
}
var replica int32
// parallelism might never be nil as the kube-apiserver will set it to 1 by default if not specified.
if job.Spec.Parallelism != nil {
replica = *job.Spec.Parallelism
}
requirement := helper.GenerateReplicaRequirements(&job.Spec.Template)
return replica, requirement, nil
}

View File

@ -8,7 +8,6 @@ import (
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,8 +25,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/crdexplorer"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
@ -49,6 +50,9 @@ type ResourceDetector struct {
Processor util.AsyncWorker
SkippedResourceConfig *util.SkippedResourceConfig
SkippedPropagatingNamespaces map[string]struct{}
// ResourceExplorer knows the details of resource structure.
ResourceExplorer crdexplorer.CustomResourceExplorer
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
// a reconcile function to consume the items in queue.
policyReconcileWorker util.AsyncWorker
@ -617,10 +621,6 @@ func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unst
// BuildResourceBinding builds a desired ResourceBinding for object.
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ResourceBinding, error) {
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
replicaRequirements, replicas, err := d.GetReplicaDeclaration(object)
if err != nil {
return nil, err
}
propagationBinding := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
@ -639,21 +639,25 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
Name: object.GetName(),
ResourceVersion: object.GetResourceVersion(),
},
ReplicaRequirements: replicaRequirements,
Replicas: replicas,
},
}
if d.ResourceExplorer.HookEnabled(object.GroupVersionKind(), configv1alpha1.ExploreReplica) {
replicas, replicaRequirements, err := d.ResourceExplorer.GetReplicas(object)
if err != nil {
klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err)
return nil, err
}
propagationBinding.Spec.Replicas = replicas
propagationBinding.Spec.ReplicaRequirements = replicaRequirements
}
return propagationBinding, nil
}
// BuildClusterResourceBinding builds a desired ClusterResourceBinding for object.
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ClusterResourceBinding, error) {
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
replicaRequirements, replicas, err := d.GetReplicaDeclaration(object)
if err != nil {
return nil, err
}
binding := &workv1alpha2.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
@ -670,63 +674,22 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
Name: object.GetName(),
ResourceVersion: object.GetResourceVersion(),
},
ReplicaRequirements: replicaRequirements,
Replicas: replicas,
},
}
if d.ResourceExplorer.HookEnabled(object.GroupVersionKind(), configv1alpha1.ExploreReplica) {
replicas, replicaRequirements, err := d.ResourceExplorer.GetReplicas(object)
if err != nil {
klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err)
return nil, err
}
binding.Spec.Replicas = replicas
binding.Spec.ReplicaRequirements = replicaRequirements
}
return binding, nil
}
// GetReplicaDeclaration get the replicas and resource requirements of a Deployment object
func (d *ResourceDetector) GetReplicaDeclaration(object *unstructured.Unstructured) (*workv1alpha2.ReplicaRequirements, int32, error) {
switch object.GetKind() {
case util.DeploymentKind:
replicas, ok, err := unstructured.NestedInt64(object.Object, util.SpecField, util.ReplicasField)
if !ok || err != nil {
return nil, 0, err
}
podTemplate, ok, err := unstructured.NestedMap(object.Object, util.SpecField, util.TemplateField)
if !ok || err != nil {
return nil, 0, err
}
replicaRequirements, err := d.getReplicaRequirements(podTemplate)
if err != nil {
return nil, 0, err
}
return replicaRequirements, int32(replicas), nil
case util.JobKind:
replicas, ok, err := unstructured.NestedInt64(object.Object, util.SpecField, util.ParallelismField)
if !ok || err != nil {
return nil, 0, err
}
podTemplate, ok, err := unstructured.NestedMap(object.Object, util.SpecField, util.TemplateField)
if !ok || err != nil {
return nil, 0, err
}
replicaRequirements, err := d.getReplicaRequirements(podTemplate)
if err != nil {
return nil, 0, err
}
return replicaRequirements, int32(replicas), nil
}
return nil, 0, nil
}
func (d *ResourceDetector) getReplicaRequirements(object map[string]interface{}) (*workv1alpha2.ReplicaRequirements, error) {
var podTemplateSpec *corev1.PodTemplateSpec
err := runtime.DefaultUnstructuredConverter.FromUnstructured(object, &podTemplateSpec)
if err != nil {
return nil, err
}
res := util.EmptyResource().AddPodRequest(&podTemplateSpec.Spec)
replicaRequirements := &workv1alpha2.ReplicaRequirements{
NodeClaim: helper.GenerateNodeClaimByPodSpec(&podTemplateSpec.Spec),
ResourceRequest: res.ResourceList(),
}
return replicaRequirements, nil
}
// AddWaiting adds object's key to waiting list.
func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) {
d.waitingLock.Lock()

View File

@ -21,6 +21,7 @@ import (
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/restmapper"
@ -307,3 +308,18 @@ func GenerateNodeClaimByPodSpec(podSpec *corev1.PodSpec) *workv1alpha2.NodeClaim
}
return nodeClaim
}
// GenerateReplicaRequirements generates replica requirements for node and resources.
func GenerateReplicaRequirements(podTemplate *corev1.PodTemplateSpec) *workv1alpha2.ReplicaRequirements {
nodeClaim := GenerateNodeClaimByPodSpec(&podTemplate.Spec)
resourceRequest := util.EmptyResource().AddPodRequest(&podTemplate.Spec).ResourceList()
if nodeClaim != nil || resourceRequest != nil {
return &workv1alpha2.ReplicaRequirements{
NodeClaim: nodeClaim,
ResourceRequest: resourceRequest,
}
}
return nil
}