Add provreqOrchestrator that handle ProvReq classes (#6627)
* Add provreqOrchestrator that handle ProvReq classes * Review remarks * Review remarks
This commit is contained in:
parent
af1e610c61
commit
5f94f2c429
|
|
@ -60,6 +60,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates"
|
||||
provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
|
|
@ -494,10 +495,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
|
|||
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
|
||||
|
||||
restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
|
||||
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
|
||||
provreqOrchestrator, err := provreqorchestrator.New(restConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
|
||||
|
||||
opts.ScaleUpOrchestrator = scaleUpOrchestrator
|
||||
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapaci
|
|||
|
||||
// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
|
||||
type ProvisioningRequestPodsInjector struct {
|
||||
client provisioningRequestClient
|
||||
client *provreqclient.ProvisioningRequestClient
|
||||
clock clock.PassiveClock
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,15 +30,10 @@ type ProvisioningRequestProcessor interface {
|
|||
CleanUp()
|
||||
}
|
||||
|
||||
type provisioningRequestClient interface {
|
||||
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
|
||||
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
|
||||
}
|
||||
|
||||
// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
|
||||
// every CA loop and updating conditions for expired ProvisioningRequests.
|
||||
type CombinedProvReqProcessor struct {
|
||||
client provisioningRequestClient
|
||||
client *provreqclient.ProvisioningRequestClient
|
||||
processors []ProvisioningRequestProcessor
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,180 +0,0 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package checkcapacity
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
type provisioningRequestClient interface {
|
||||
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
|
||||
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
|
||||
}
|
||||
|
||||
type provReqOrchestrator struct {
|
||||
initialized bool
|
||||
context *context.AutoscalingContext
|
||||
client provisioningRequestClient
|
||||
injector *scheduling.HintingSimulator
|
||||
}
|
||||
|
||||
// New return new orchestrator.
|
||||
func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
|
||||
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provReqOrchestrator{client: client}, nil
|
||||
}
|
||||
|
||||
// Initialize initialize orchestrator.
|
||||
func (o *provReqOrchestrator) Initialize(
|
||||
autoscalingContext *context.AutoscalingContext,
|
||||
processors *ca_processors.AutoscalingProcessors,
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry,
|
||||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
taintConfig taints.TaintConfig,
|
||||
) {
|
||||
o.initialized = true
|
||||
o.context = autoscalingContext
|
||||
o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker)
|
||||
}
|
||||
|
||||
// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest.
|
||||
func (o *provReqOrchestrator) ScaleUp(
|
||||
unschedulablePods []*apiv1.Pod,
|
||||
nodes []*apiv1.Node,
|
||||
daemonSets []*appsv1.DaemonSet,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
) (*status.ScaleUpStatus, errors.AutoscalerError) {
|
||||
if !o.initialized {
|
||||
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
|
||||
}
|
||||
if len(unschedulablePods) == 0 {
|
||||
return &status.ScaleUpStatus{}, nil
|
||||
}
|
||||
if _, err := o.verifyProvisioningRequestClass(unschedulablePods); err != nil {
|
||||
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error()))
|
||||
}
|
||||
|
||||
o.context.ClusterSnapshot.Fork()
|
||||
defer o.context.ClusterSnapshot.Revert()
|
||||
if err := o.bookCapacity(); err != nil {
|
||||
return nil, errors.NewAutoscalerError(errors.InternalError, err.Error())
|
||||
}
|
||||
scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods)
|
||||
if err != nil {
|
||||
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
|
||||
}
|
||||
if scaleUpIsSuccessful {
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil
|
||||
}
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil
|
||||
}
|
||||
|
||||
// ScaleUpToNodeGroupMinSize is no-op.
|
||||
func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
|
||||
nodes []*apiv1.Node,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (o *provReqOrchestrator) bookCapacity() error {
|
||||
provReqs, err := o.client.ProvisioningRequests()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err)
|
||||
}
|
||||
podsToCreate := []*apiv1.Pod{}
|
||||
for _, provReq := range provReqs {
|
||||
if conditions.ShouldCapacityBeBooked(provReq) {
|
||||
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
|
||||
if err != nil {
|
||||
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
|
||||
// If there is an error, mark PR as invalid, because we won't be able to book capacity
|
||||
// for it anyway.
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
|
||||
continue
|
||||
}
|
||||
podsToCreate = append(podsToCreate, pods...)
|
||||
}
|
||||
}
|
||||
if len(podsToCreate) == 0 {
|
||||
return nil
|
||||
}
|
||||
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
|
||||
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
|
||||
klog.Warningf("Error during capacity booking: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Assuming that all unschedulable pods comes from one ProvisioningRequest.
|
||||
func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) {
|
||||
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
|
||||
}
|
||||
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
|
||||
if len(st) < len(unschedulablePods) || err != nil {
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
|
||||
return false, err
|
||||
}
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// verifyPods check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class.
|
||||
func (o *provReqOrchestrator) verifyProvisioningRequestClass(unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) {
|
||||
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
|
||||
}
|
||||
if provReq.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
|
||||
return nil, fmt.Errorf("ProvisioningRequestClass is not %s", v1beta1.ProvisioningClassCheckCapacity)
|
||||
}
|
||||
for _, pod := range unschedulablePods {
|
||||
if pod.Namespace != unschedulablePods[0].Namespace {
|
||||
return nil, fmt.Errorf("Pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name)
|
||||
}
|
||||
if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name {
|
||||
return nil, fmt.Errorf("Pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name)
|
||||
}
|
||||
}
|
||||
return provReq, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package checkcapacity
|
||||
|
||||
import (
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
|
||||
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
type checkCapacityProvClass struct {
|
||||
context *context.AutoscalingContext
|
||||
client *provreqclient.ProvisioningRequestClient
|
||||
injector *scheduling.HintingSimulator
|
||||
}
|
||||
|
||||
// New create check-capacity scale-up mode.
|
||||
func New(
|
||||
client *provreqclient.ProvisioningRequestClient,
|
||||
) *checkCapacityProvClass {
|
||||
return &checkCapacityProvClass{client: client}
|
||||
}
|
||||
|
||||
func (o *checkCapacityProvClass) Initialize(
|
||||
autoscalingContext *context.AutoscalingContext,
|
||||
processors *ca_processors.AutoscalingProcessors,
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry,
|
||||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
taintConfig taints.TaintConfig,
|
||||
injector *scheduling.HintingSimulator,
|
||||
) {
|
||||
o.context = autoscalingContext
|
||||
o.injector = injector
|
||||
}
|
||||
|
||||
// Provision return if there is capacity in the cluster for pods from ProvisioningRequest.
|
||||
func (o *checkCapacityProvClass) Provision(
|
||||
unschedulablePods []*apiv1.Pod,
|
||||
nodes []*apiv1.Node,
|
||||
daemonSets []*appsv1.DaemonSet,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
) (*status.ScaleUpStatus, errors.AutoscalerError) {
|
||||
if len(unschedulablePods) == 0 {
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
|
||||
}
|
||||
pr, err := provreqclient.ProvisioningRequestForPods(o.client, unschedulablePods)
|
||||
if err != nil {
|
||||
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error()))
|
||||
}
|
||||
if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
|
||||
}
|
||||
|
||||
o.context.ClusterSnapshot.Fork()
|
||||
defer o.context.ClusterSnapshot.Revert()
|
||||
|
||||
scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods, pr)
|
||||
if err != nil {
|
||||
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
|
||||
}
|
||||
if scaleUpIsSuccessful {
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil
|
||||
}
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil
|
||||
}
|
||||
|
||||
// Assuming that all unschedulable pods comes from one ProvisioningRequest.
|
||||
func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (bool, error) {
|
||||
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
|
||||
if len(st) < len(unschedulablePods) || err != nil {
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
|
||||
return false, err
|
||||
}
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
|
||||
return true, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
|
||||
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
type provisioningClass interface {
|
||||
Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet,
|
||||
map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError)
|
||||
Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry,
|
||||
estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator)
|
||||
}
|
||||
|
||||
// provReqOrchestrator is an orchestrator that contains orchestrators for all supported Provisioning Classes.
|
||||
type provReqOrchestrator struct {
|
||||
initialized bool
|
||||
context *context.AutoscalingContext
|
||||
client *provreqclient.ProvisioningRequestClient
|
||||
injector *scheduling.HintingSimulator
|
||||
provisioningClasses []provisioningClass
|
||||
}
|
||||
|
||||
// New return new orchestrator.
|
||||
func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
|
||||
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provReqOrchestrator{client: client, provisioningClasses: []provisioningClass{checkcapacity.New(client)}}, nil
|
||||
}
|
||||
|
||||
// Initialize initialize orchestrator.
|
||||
func (o *provReqOrchestrator) Initialize(
|
||||
autoscalingContext *context.AutoscalingContext,
|
||||
processors *ca_processors.AutoscalingProcessors,
|
||||
clusterStateRegistry *clusterstate.ClusterStateRegistry,
|
||||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
taintConfig taints.TaintConfig,
|
||||
) {
|
||||
o.initialized = true
|
||||
o.context = autoscalingContext
|
||||
o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker)
|
||||
for _, mode := range o.provisioningClasses {
|
||||
mode.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector)
|
||||
}
|
||||
}
|
||||
|
||||
// ScaleUp run ScaleUp for each Provisionining Class. As of now, CA pick one ProvisioningRequest,
|
||||
// so only one ProvisioningClass return non empty scaleUp result.
|
||||
// In case we implement multiple ProvisioningRequest ScaleUp, the function should return combined status
|
||||
func (o *provReqOrchestrator) ScaleUp(
|
||||
unschedulablePods []*apiv1.Pod,
|
||||
nodes []*apiv1.Node,
|
||||
daemonSets []*appsv1.DaemonSet,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
|
||||
if !o.initialized {
|
||||
return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))
|
||||
}
|
||||
|
||||
o.context.ClusterSnapshot.Fork()
|
||||
defer o.context.ClusterSnapshot.Revert()
|
||||
o.bookCapacity()
|
||||
|
||||
// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
|
||||
for _, provClass := range o.provisioningClasses {
|
||||
st, err := provClass.Provision(unschedulablePods, nodes, daemonSets, nodeInfos)
|
||||
if err != nil || st != nil && st.Result != status.ScaleUpNotTried {
|
||||
return st, err
|
||||
}
|
||||
}
|
||||
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
|
||||
}
|
||||
|
||||
// ScaleUpToNodeGroupMinSize doesn't have implementation for ProvisioningRequest Orchestrator.
|
||||
func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
|
||||
nodes []*apiv1.Node,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (o *provReqOrchestrator) bookCapacity() error {
|
||||
provReqs, err := o.client.ProvisioningRequests()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
|
||||
}
|
||||
podsToCreate := []*apiv1.Pod{}
|
||||
for _, provReq := range provReqs {
|
||||
if conditions.ShouldCapacityBeBooked(provReq) {
|
||||
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
|
||||
if err != nil {
|
||||
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
|
||||
// If there is an error, mark PR as invalid, because we won't be able to book capacity
|
||||
// for it anyway.
|
||||
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
|
||||
continue
|
||||
}
|
||||
podsToCreate = append(podsToCreate, pods...)
|
||||
}
|
||||
}
|
||||
if len(podsToCreate) == 0 {
|
||||
return nil
|
||||
}
|
||||
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
|
||||
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
|
||||
klog.Warningf("Error during capacity booking: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package checkcapacity
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -31,11 +31,12 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/config"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
|
@ -68,8 +69,9 @@ func TestScaleUp(t *testing.T) {
|
|||
err bool
|
||||
}{
|
||||
{
|
||||
name: "no ProvisioningRequests",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{},
|
||||
name: "no ProvisioningRequests",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{},
|
||||
scaleUpResult: status.ScaleUpNotTried,
|
||||
},
|
||||
{
|
||||
name: "one ProvisioningRequest",
|
||||
|
|
@ -87,7 +89,7 @@ func TestScaleUp(t *testing.T) {
|
|||
name: "pods from different ProvisioningRequest class",
|
||||
provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass},
|
||||
provReqToScaleUp: differentProvReqClass,
|
||||
err: true,
|
||||
scaleUpResult: status.ScaleUpNotTried,
|
||||
},
|
||||
{
|
||||
name: "some capacity is booked, succesfull ScaleUp",
|
||||
|
|
@ -107,12 +109,12 @@ func TestScaleUp(t *testing.T) {
|
|||
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil)
|
||||
prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp)
|
||||
assert.NoError(t, err)
|
||||
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
|
||||
orchestrator := &provReqOrchestrator{
|
||||
initialized: true,
|
||||
context: &autoscalingContext,
|
||||
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...),
|
||||
injector: scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker),
|
||||
client: client,
|
||||
provisioningClasses: []provisioningClass{checkcapacity.New(client)},
|
||||
}
|
||||
orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{})
|
||||
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{})
|
||||
if !tc.err {
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -17,21 +17,18 @@ limitations under the License.
|
|||
package orchestrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/estimator"
|
||||
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
|
||||
"k8s.io/client-go/rest"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
||||
|
|
@ -41,20 +38,16 @@ import (
|
|||
type WrapperOrchestrator struct {
|
||||
// scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present.
|
||||
scaleUpRegularPods bool
|
||||
scaleUpOrchestrator scaleup.Orchestrator
|
||||
podsOrchestrator scaleup.Orchestrator
|
||||
provReqOrchestrator scaleup.Orchestrator
|
||||
}
|
||||
|
||||
// NewWrapperOrchestrator return WrapperOrchestrator
|
||||
func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) {
|
||||
provReqOrchestrator, err := checkcapacity.New(kubeConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err)
|
||||
}
|
||||
func NewWrapperOrchestrator(provReqOrchestrator scaleup.Orchestrator) *WrapperOrchestrator {
|
||||
return &WrapperOrchestrator{
|
||||
scaleUpOrchestrator: New(),
|
||||
podsOrchestrator: orchestrator.New(),
|
||||
provReqOrchestrator: provReqOrchestrator,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize initializes the orchestrator object with required fields.
|
||||
|
|
@ -65,7 +58,7 @@ func (o *WrapperOrchestrator) Initialize(
|
|||
estimatorBuilder estimator.EstimatorBuilder,
|
||||
taintConfig taints.TaintConfig,
|
||||
) {
|
||||
o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig)
|
||||
o.podsOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig)
|
||||
o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig)
|
||||
}
|
||||
|
||||
|
|
@ -86,7 +79,7 @@ func (o *WrapperOrchestrator) ScaleUp(
|
|||
}
|
||||
|
||||
if o.scaleUpRegularPods {
|
||||
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
|
||||
return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
|
||||
}
|
||||
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
|
||||
}
|
||||
|
|
@ -110,5 +103,5 @@ func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize(
|
|||
nodes []*apiv1.Node,
|
||||
nodeInfos map[string]*schedulerframework.NodeInfo,
|
||||
) (*status.ScaleUpStatus, errors.AutoscalerError) {
|
||||
return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
|
||||
return o.podsOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
|
||||
}
|
||||
|
|
@ -39,10 +39,10 @@ const (
|
|||
regularPodsErrorMsg = "regularPodsError"
|
||||
)
|
||||
|
||||
func TestScaleUp(t *testing.T) {
|
||||
func TestWrapperScaleUp(t *testing.T) {
|
||||
o := WrapperOrchestrator{
|
||||
provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg},
|
||||
scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg},
|
||||
podsOrchestrator: &fakeScaleUp{regularPodsErrorMsg},
|
||||
}
|
||||
regularPods := []*apiv1.Pod{
|
||||
BuildTestPod("pod-1", 1, 100),
|
||||
|
|
@ -157,3 +157,29 @@ func newPodTemplatesLister(client *kubernetes.Clientset, stopChannel <-chan stru
|
|||
klog.V(2).Info("Successful initial Pod Template sync")
|
||||
return podTemplLister, nil
|
||||
}
|
||||
|
||||
// ProvisioningRequestForPods check that all pods belong to one ProvisioningRequest and return it.
|
||||
func ProvisioningRequestForPods(client *ProvisioningRequestClient, unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) {
|
||||
if len(unschedulablePods) == 0 {
|
||||
return nil, fmt.Errorf("empty unschedulablePods list")
|
||||
}
|
||||
if unschedulablePods[0].OwnerReferences == nil || len(unschedulablePods[0].OwnerReferences) == 0 {
|
||||
return nil, fmt.Errorf("pod %s has no OwnerReference", unschedulablePods[0].Name)
|
||||
}
|
||||
provReq, err := client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
|
||||
}
|
||||
for _, pod := range unschedulablePods {
|
||||
if pod.Namespace != unschedulablePods[0].Namespace {
|
||||
return nil, fmt.Errorf("pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name)
|
||||
}
|
||||
if pod.OwnerReferences == nil || len(pod.OwnerReferences) == 0 {
|
||||
return nil, fmt.Errorf("pod %s has no OwnerReference", pod.Name)
|
||||
}
|
||||
if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name {
|
||||
return nil, fmt.Errorf("pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name)
|
||||
}
|
||||
}
|
||||
return provReq, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,9 +19,15 @@ package provreqclient
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
|
||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||
)
|
||||
|
||||
func TestFetchPodTemplates(t *testing.T) {
|
||||
|
|
@ -42,3 +48,64 @@ func TestFetchPodTemplates(t *testing.T) {
|
|||
t.Errorf("Template mismatch, diff (-want +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvisioningRequestForPods(t *testing.T) {
|
||||
checkCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "check-capacity", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
|
||||
customProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "custom", "1m", "100", "", int32(100), false, time.Now(), "custom")
|
||||
checkCapacityPods, _ := pods.PodsForProvisioningRequest(checkCapacityProvReq)
|
||||
customProvReqPods, _ := pods.PodsForProvisioningRequest(customProvReq)
|
||||
regularPod := BuildTestPod("p1", 600, 100)
|
||||
client := NewFakeProvisioningRequestClient(context.Background(), t, checkCapacityProvReq, customProvReq)
|
||||
testCases := []struct {
|
||||
name string
|
||||
pods []*apiv1.Pod
|
||||
className string
|
||||
err bool
|
||||
pr *provreqwrapper.ProvisioningRequest
|
||||
}{
|
||||
{
|
||||
name: "no pods",
|
||||
pods: []*apiv1.Pod{},
|
||||
className: "some-class",
|
||||
err: true,
|
||||
},
|
||||
{
|
||||
name: "pods from one Provisioning Class",
|
||||
pods: checkCapacityPods,
|
||||
className: v1beta1.ProvisioningClassCheckCapacity,
|
||||
pr: checkCapacityProvReq,
|
||||
},
|
||||
{
|
||||
name: "pods from different Provisioning Classes",
|
||||
pods: append(checkCapacityPods, customProvReqPods...),
|
||||
className: v1beta1.ProvisioningClassCheckCapacity,
|
||||
err: true,
|
||||
},
|
||||
{
|
||||
name: "regular pod",
|
||||
pods: []*apiv1.Pod{regularPod},
|
||||
className: v1beta1.ProvisioningClassCheckCapacity,
|
||||
err: true,
|
||||
},
|
||||
{
|
||||
name: "provreq pods and regular pod",
|
||||
pods: append(checkCapacityPods, regularPod),
|
||||
className: v1beta1.ProvisioningClassCheckCapacity,
|
||||
err: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
pr, err := ProvisioningRequestForPods(client, tc.pods)
|
||||
if tc.err {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, pr, tc.pr)
|
||||
assert.Equal(t, pr.Spec.ProvisioningClassName, tc.className)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,6 +81,9 @@ func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerErr
|
|||
if e, ok := err.(AutoscalerError); ok {
|
||||
return e
|
||||
}
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return NewAutoscalerError(defaultType, "%v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue