ScaleUp for check-capacity ProvisioningRequestClass (#6451)

* ScaleUp for check-capacity ProvisioningRequestClass

* update condition logic

* Update tests

* Naming update

* Update cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go

Co-authored-by: Bartek Wróblewski <bwroblewski@google.com>

---------

Co-authored-by: Bartek Wróblewski <bwroblewski@google.com>
This commit is contained in:
Yaroslava Serdiuk 2024-01-30 12:36:59 +02:00 committed by GitHub
parent cf171a7a04
commit ed6ebbe8ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1395 additions and 50 deletions

View File

@ -88,7 +88,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodeInfos map[string]*schedulerframework.NodeInfo, nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) { ) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized { if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
} }
loggingQuota := klogx.PodsLoggingQuota() loggingQuota := klogx.PodsLoggingQuota()
@ -103,7 +103,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
upcomingNodes, aErr := o.UpcomingNodes(nodeInfos) upcomingNodes, aErr := o.UpcomingNodes(nodeInfos)
if aErr != nil { if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: ")) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
} }
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
@ -112,7 +112,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
var err error var err error
nodeGroups, nodeInfos, err = o.processors.NodeGroupListProcessor.Process(o.autoscalingContext, nodeGroups, nodeInfos, unschedulablePods) nodeGroups, nodeInfos, err = o.processors.NodeGroupListProcessor.Process(o.autoscalingContext, nodeGroups, nodeInfos, unschedulablePods)
if err != nil { if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err)) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
} }
} }
@ -121,7 +121,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes) resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil { if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
} }
now := time.Now() now := time.Now()
@ -186,7 +186,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes)) newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes))
if aErr != nil { if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr) return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
} }
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
@ -194,7 +194,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
oldId := bestOption.NodeGroup.Id() oldId := bestOption.NodeGroup.Id()
createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, bestOption.NodeGroup) createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, bestOption.NodeGroup)
if aErr != nil { if aErr != nil {
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods},
aErr) aErr)
} }
@ -253,7 +253,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if !found { if !found {
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup. // This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id()) klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
errors.NewAutoscalerError( errors.NewAutoscalerError(
errors.CloudProviderError, errors.CloudProviderError,
@ -263,7 +263,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
// Apply upper limits for CPU and memory. // Apply upper limits for CPU and memory.
newNodes, aErr = o.resourceManager.ApplyLimits(o.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup) newNodes, aErr = o.resourceManager.ApplyLimits(o.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup)
if aErr != nil { if aErr != nil {
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr) aErr)
} }
@ -283,7 +283,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes) scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes)
if aErr != nil { if aErr != nil {
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr) aErr)
} }
@ -291,7 +291,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil { if aErr != nil {
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{ &status.ScaleUpStatus{
CreateNodeGroupResults: createNodeGroupResults, CreateNodeGroupResults: createNodeGroupResults,
FailedResizeNodeGroups: failedNodeGroups, FailedResizeNodeGroups: failedNodeGroups,
@ -322,7 +322,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodeInfos map[string]*schedulerframework.NodeInfo, nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) { ) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized { if !o.initialized {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
} }
now := time.Now() now := time.Now()
@ -331,7 +331,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes) resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes)
if aErr != nil { if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
} }
for _, ng := range nodeGroups { for _, ng := range nodeGroups {
@ -397,7 +397,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos) klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
if aErr != nil { if aErr != nil {
return scaleUpError( return status.UpdateScaleUpError(
&status.ScaleUpStatus{ &status.ScaleUpStatus{
FailedResizeNodeGroups: failedNodeGroups, FailedResizeNodeGroups: failedNodeGroups,
}, },
@ -717,9 +717,3 @@ func GetPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) [
} }
return awaitsEvaluation return awaitsEvaluation
} }
func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = status.ScaleUpError
return s, err
}

View File

@ -0,0 +1,112 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package orchestrator
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// WrapperOrchestrator is an orchestrator which wraps Scale Up for ProvisioningRequests and regular pods.
// Each loop WrapperOrchestrator split out regular and pods from ProvisioningRequest, pick one group that
// wasn't picked in the last loop and run ScaleUp for it.
type WrapperOrchestrator struct {
// scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present.
scaleUpRegularPods bool
scaleUpOrchestrator scaleup.Orchestrator
provReqOrchestrator scaleup.Orchestrator
}
// NewWrapperOrchestrator return WrapperOrchestrator
func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) {
provReqOrchestrator, err := checkcapacity.New(kubeConfig)
if err != nil {
return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err)
}
return &WrapperOrchestrator{
scaleUpOrchestrator: New(),
provReqOrchestrator: provReqOrchestrator,
}, nil
}
// Initialize initializes the orchestrator object with required fields.
func (o *WrapperOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
}
// ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest.
func (o *WrapperOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }()
provReqPods, regularPods := splitOut(unschedulablePods)
if len(provReqPods) == 0 {
o.scaleUpRegularPods = true
} else if len(regularPods) == 0 {
o.scaleUpRegularPods = false
}
if o.scaleUpRegularPods {
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}
func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
for _, pod := range unschedulablePods {
if _, ok := pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey]; ok {
provReqPods = append(provReqPods, pod)
} else {
regularPods = append(regularPods, pod)
}
}
return
}
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package orchestrator
import (
"testing"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
provisioningRequestErrorMsg = "provisioningRequestError"
regularPodsErrorMsg = "regularPodsError"
)
func TestScaleUp(t *testing.T) {
o := WrapperOrchestrator{
provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg},
scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg},
}
regularPods := []*apiv1.Pod{
BuildTestPod("pod-1", 1, 100),
BuildTestPod("pod-2", 1, 100),
}
provReqPods := []*apiv1.Pod{
BuildTestPod("pr-pod-1", 1, 100),
BuildTestPod("pr-pod-2", 1, 100),
}
for _, pod := range provReqPods {
pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true"
}
unschedulablePods := append(regularPods, provReqPods...)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil)
assert.Equal(t, err.Error(), provisioningRequestErrorMsg)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil)
assert.Equal(t, err.Error(), regularPodsErrorMsg)
}
type fakeScaleUp struct {
errorMsg string
}
func (f *fakeScaleUp) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg)
}
func (f *fakeScaleUp) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
}
func (f *fakeScaleUp) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, nil
}

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
@ -468,6 +469,15 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions) drainabilityRules := rules.Default(deleteOptions)
scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}
opts := core.AutoscalerOptions{ opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions, AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(), ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
@ -477,6 +487,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker, PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions, DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules, DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
} }
opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)

View File

@ -28,7 +28,8 @@ import (
) )
const ( const (
provisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request" // ProvisioningRequestPodAnnotationKey is an annotation on pod that indicate that pod was created by ProvisioningRequest.
ProvisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
maxProvReqEvent = 50 maxProvReqEvent = 50
) )
@ -101,6 +102,6 @@ func provisioningRequestName(pod *v1.Pod) (string, bool) {
if pod == nil || pod.Annotations == nil { if pod == nil || pod.Annotations == nil {
return "", false return "", false
} }
provReqName, found := pod.Annotations[provisioningRequestPodAnnotationKey] provReqName, found := pod.Annotations[ProvisioningRequestPodAnnotationKey]
return provReqName, found return provReqName, found
} }

View File

@ -31,10 +31,10 @@ import (
func TestProvisioningRequestPodsFilter(t *testing.T) { func TestProvisioningRequestPodsFilter(t *testing.T) {
prPod1 := BuildTestPod("pr-pod-1", 500, 10) prPod1 := BuildTestPod("pr-pod-1", 500, 10)
prPod1.Annotations[provisioningRequestPodAnnotationKey] = "pr-class" prPod1.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class"
prPod2 := BuildTestPod("pr-pod-2", 500, 10) prPod2 := BuildTestPod("pr-pod-2", 500, 10)
prPod2.Annotations[provisioningRequestPodAnnotationKey] = "pr-class-2" prPod2.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class-2"
pod1 := BuildTestPod("pod-1", 500, 10) pod1 := BuildTestPod("pod-1", 500, 10)
pod2 := BuildTestPod("pod-2", 500, 10) pod2 := BuildTestPod("pod-2", 500, 10)
@ -91,7 +91,7 @@ func TestEventManager(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
prPod := BuildTestPod(fmt.Sprintf("pr-pod-%d", i), 10, 10) prPod := BuildTestPod(fmt.Sprintf("pr-pod-%d", i), 10, 10)
prPod.Annotations[provisioningRequestPodAnnotationKey] = "pr-class" prPod.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class"
unscheduledPods = append(unscheduledPods, prPod) unscheduledPods = append(unscheduledPods, prPod)
} }
got, err := prFilter.Process(ctx, unscheduledPods) got, err := prFilter.Process(ctx, unscheduledPods)

View File

@ -99,3 +99,10 @@ func (p *NoOpScaleUpStatusProcessor) Process(context *context.AutoscalingContext
// CleanUp cleans up the processor's internal structures. // CleanUp cleans up the processor's internal structures.
func (p *NoOpScaleUpStatusProcessor) CleanUp() { func (p *NoOpScaleUpStatusProcessor) CleanUp() {
} }
// UpdateScaleUpError updates ScaleUpStatus.
func UpdateScaleUpError(s *ScaleUpStatus, err errors.AutoscalerError) (*ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = ScaleUpError
return s, err
}

View File

@ -175,9 +175,12 @@ type Detail string
// The following constants list all currently available Conditions Type values. // The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition // See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const ( const (
// CapacityAvailable indicates that all of the requested resources were // CapacityFound indicates that all of the requested resources were
// already available in the cluster. // fount in the cluster.
CapacityAvailable string = "CapacityAvailable" CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created // Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the // and are available in the cluster. CA will set this condition when the
// VM creation finishes successfully. // VM creation finishes successfully.

View File

@ -0,0 +1,94 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)
const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)
const (
//CapacityIsNotFoundReason is added when capacity was not found in the cluster.
CapacityIsNotFoundReason = "CapacityIsNotFound"
//CapacityIsFoundReason is added when capacity was found in the cluster.
CapacityIsFoundReason = "CapacityIsFound"
//FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster.
FailedToBookCapacityReason = "FailedToBookCapacity"
)
func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
if pr.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
return false
}
if pr.Conditions() == nil || len(pr.Conditions()) == 0 {
return false
}
book := false
for _, condition := range pr.Conditions() {
if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) {
return false
} else if checkConditionType(condition, v1beta1.CapacityFound) {
book = true
}
}
return book
}
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) {
var newConditions []v1.Condition
newCondition := v1.Condition{
Type: conditionType,
Status: conditionStatus,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
Reason: reason,
Message: message,
}
prevConditions := pr.Conditions()
switch conditionType {
case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed:
conditionFound := false
for _, condition := range prevConditions {
if condition.Type == conditionType {
conditionFound = true
newConditions = append(newConditions, newCondition)
} else {
newConditions = append(newConditions, condition)
}
}
if !conditionFound {
newConditions = append(prevConditions, newCondition)
}
default:
klog.Errorf("Unknown (conditionType; conditionStatus) pair: (%s; %s) ", conditionType, conditionStatus)
newConditions = prevConditions
}
pr.SetConditions(newConditions)
}
func checkConditionType(condition v1.Condition, conditionType string) bool {
return condition.Type == conditionType && condition.Status == v1.ConditionTrue
}

View File

@ -0,0 +1,274 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"testing"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)
func TestBookCapacity(t *testing.T) {
tests := []struct {
name string
prConditions []v1.Condition
want bool
}{
{
name: "BookingExpired",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.BookingExpired,
Status: v1.ConditionTrue,
},
},
want: false,
},
{
name: "Failed",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.Failed,
Status: v1.ConditionTrue,
},
},
want: false,
},
{
name: "empty conditions",
want: false,
},
{
name: "Capacity found and provisioned",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
want: true,
},
{
name: "Capacity is not found",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionFalse,
},
},
want: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pr := provreqwrapper.NewV1Beta1ProvisioningRequest(
&v1beta1.ProvisioningRequest{
Spec: v1beta1.ProvisioningRequestSpec{
ProvisioningClassName: v1beta1.ProvisioningClassCheckCapacity,
},
Status: v1beta1.ProvisioningRequestStatus{
Conditions: test.prConditions,
},
}, nil)
got := shouldCapacityBeBooked(pr)
if got != test.want {
t.Errorf("Want: %v, got: %v", test.want, got)
}
})
}
}
func TestSetCondition(t *testing.T) {
tests := []struct {
name string
oldConditions []v1.Condition
newType string
newStatus v1.ConditionStatus
want []v1.Condition
}{
{
name: "CapacityFound added, empty conditions before",
newType: v1beta1.CapacityFound,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
{
name: "CapacityFound updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionFalse,
},
},
newType: v1beta1.CapacityFound,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
{
name: "Failed added, non-empty conditions before",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
newType: v1beta1.Failed,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.Failed,
Status: v1.ConditionTrue,
},
},
},
{
name: "Provisioned condition type, conditions are not updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
newType: v1beta1.Provisioned,
newStatus: v1.ConditionFalse,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
{
name: "Unknown condition status, conditions are updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
newType: v1beta1.Failed,
newStatus: v1.ConditionUnknown,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.Failed,
Status: v1.ConditionUnknown,
},
},
},
{
name: "Unknown condition type, conditions are not updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
newType: "Unknown",
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
{
name: "BookingExpired, empty conditions before",
newType: v1beta1.BookingExpired,
newStatus: v1.ConditionFalse,
want: []v1.Condition{
{
Type: v1beta1.BookingExpired,
Status: v1.ConditionFalse,
},
},
},
{
name: "Capacity found with unknown condition before",
oldConditions: []v1.Condition{
{
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
newType: v1beta1.CapacityFound,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pr := provreqwrapper.NewV1Beta1ProvisioningRequest(
&v1beta1.ProvisioningRequest{
Status: v1beta1.ProvisioningRequestStatus{
Conditions: test.oldConditions,
},
}, nil)
setCondition(pr, test.newType, test.newStatus, "", "")
got := pr.Conditions()
if len(got) > 2 || len(got) != len(test.want) || got[0].Type != test.want[0].Type || got[0].Status != test.want[0].Status {
t.Errorf("want %v, got: %v", test.want, got)
}
if len(got) == 2 {
if got[1].Type != test.want[1].Type || got[1].Status != test.want[1].Status {
t.Errorf("want %v, got: %v", test.want, got)
}
}
})
}
}

View File

@ -0,0 +1,177 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}
type provReqOrchestrator struct {
initialized bool
context *context.AutoscalingContext
client provisioningRequestClient
injector *scheduling.HintingSimulator
}
// New return new orchestrator.
func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &provReqOrchestrator{client: client}, nil
}
// Initialize initialize orchestrator.
func (o *provReqOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.initialized = true
o.context = autoscalingContext
o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker)
}
// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest.
func (o *provReqOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}
if len(unschedulablePods) == 0 {
return &status.ScaleUpStatus{}, nil
}
if _, err := o.verifyProvisioningRequestClass(unschedulablePods); err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error()))
}
o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
if err := o.bookCapacity(); err != nil {
return nil, errors.NewAutoscalerError(errors.InternalError, err.Error())
}
scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
}
if scaleUpIsSuccessful {
return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil
}
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil
}
// ScaleUpToNodeGroupMinSize is no-op.
func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, nil
}
func (o *provReqOrchestrator) bookCapacity() error {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if shouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err))
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
// Assuming that all unschedulable pods comes from one ProvisioningRequest.
func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) {
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
if err != nil {
return false, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
}
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.")
return false, err
}
setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.")
return true, nil
}
// verifyPods check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class.
func (o *provReqOrchestrator) verifyProvisioningRequestClass(unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) {
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
if err != nil {
return nil, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
}
if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
return nil, fmt.Errorf("ProvisioningRequestClass is not %s", v1beta1.ProvisioningClassCheckCapacity)
}
for _, pod := range unschedulablePods {
if pod.Namespace != unschedulablePods[0].Namespace {
return nil, fmt.Errorf("Pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name)
}
if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name {
return nil, fmt.Errorf("Pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name)
}
}
return provReq, nil
}

View File

@ -0,0 +1,125 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
func TestScaleUp(t *testing.T) {
allNodes := []*apiv1.Node{}
for i := 0; i < 100; i++ {
name := fmt.Sprintf("test-cpu-node-%d", i)
node := BuildTestNode(name, 100, 10)
allNodes = append(allNodes, node)
}
for i := 0; i < 100; i++ {
name := fmt.Sprintf("test-mem-node-%d", i)
node := BuildTestNode(name, 1, 1000)
allNodes = append(allNodes, node)
}
newCpuProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newCpuProvReq", "5m", "5", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
newMemProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newMemProvReq", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
bookedCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.CapacityFound, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}})
expiredProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
expiredProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.BookingExpired, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}})
differentProvReqClass := provreqwrapper.BuildTestProvisioningRequest("ns", "differentProvReqClass", "1", "1", "", int32(5), false, time.Now(), v1beta1.ProvisioningClassAtomicScaleUp)
testCases := []struct {
name string
provReqs []*provreqwrapper.ProvisioningRequest
provReqToScaleUp *provreqwrapper.ProvisioningRequest
scaleUpResult status.ScaleUpResult
err bool
}{
{
name: "no ProvisioningRequests",
provReqs: []*provreqwrapper.ProvisioningRequest{},
},
{
name: "one ProvisioningRequest",
provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq},
provReqToScaleUp: newCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
},
{
name: "capacity in the cluster is booked",
provReqs: []*provreqwrapper.ProvisioningRequest{newMemProvReq, bookedCapacityProvReq},
provReqToScaleUp: newMemProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
},
{
name: "pods from different ProvisioningRequest class",
provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass},
provReqToScaleUp: differentProvReqClass,
err: true,
},
{
name: "some capacity is booked, succesfull ScaleUp",
provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass},
provReqToScaleUp: newCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
},
}
for _, tc := range testCases {
tc := tc
allNodes := allNodes
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
provider := testprovider.NewTestCloudProvider(nil, nil)
autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil)
prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp)
assert.NoError(t, err)
orchestrator := &provReqOrchestrator{
initialized: true,
context: &autoscalingContext,
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...),
injector: scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker),
}
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{})
if !tc.err {
assert.NoError(t, err)
assert.Equal(t, tc.scaleUpResult, st.Result)
} else {
assert.Error(t, err)
}
})
}
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
import (
"fmt"
"google.golang.org/protobuf/proto"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/kubernetes/pkg/controller"
)
const (
// ProvisioningRequestPodAnnotationKey is a key used to annotate pods consuming provisioning request.
ProvisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
// ProvisioningClassPodAnnotationKey is a key used to add annotation about Provisioning Class
ProvisioningClassPodAnnotationKey = "cluster-autoscaler.kubernetes.io/provisioning-class-name"
)
// PodsForProvisioningRequest returns a list of pods for which Provisioning
// Request needs to provision resources.
func PodsForProvisioningRequest(pr *provreqwrapper.ProvisioningRequest) ([]*v1.Pod, error) {
if pr == nil {
return nil, nil
}
podSets, err := pr.PodSets()
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0)
for i, podSet := range podSets {
for j := 0; j < int(podSet.Count); j++ {
pod, err := controller.GetPodFromTemplate(&podSet.PodTemplate, pr.RuntimeObject(), ownerReference(pr))
if err != nil {
return nil, fmt.Errorf("while creating pod for pr: %s/%s podSet: %d, got error: %w", pr.Namespace(), pr.Name(), i, err)
}
populatePodFields(pr, pod, i, j)
pods = append(pods, pod)
}
}
return pods, nil
}
// ownerReference injects owner reference that points to the ProvReq object.
// This allows CA to group the pods as coming from one controller and simplifies
// the scale-up simulation logic and number of logs lines emitted.
func ownerReference(pr *provreqwrapper.ProvisioningRequest) *metav1.OwnerReference {
return &metav1.OwnerReference{
APIVersion: pr.APIVersion(),
Kind: pr.Kind(),
Name: pr.Name(),
UID: pr.UID(),
Controller: proto.Bool(true),
}
}
func populatePodFields(pr *provreqwrapper.ProvisioningRequest, pod *v1.Pod, i, j int) {
pod.Name = fmt.Sprintf("%s%d-%d", pod.GenerateName, i, j)
pod.Namespace = pr.Namespace()
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[ProvisioningRequestPodAnnotationKey] = pr.Name()
pod.Annotations[ProvisioningClassPodAnnotationKey] = pr.V1Beta1().Spec.ProvisioningClassName
pod.UID = types.UID(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
pod.CreationTimestamp = pr.CreationTimestamp()
}

View File

@ -0,0 +1,269 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pods
import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
// "google.golang.org/protobuf/testing/protocmp"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)
const testProvisioningClassName = "TestProvisioningClass"
func TestPodsForProvisioningRequest(t *testing.T) {
testPod := func(name, genName, containerName, containerImage, prName string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
GenerateName: genName,
Namespace: "test-namespace",
UID: types.UID(fmt.Sprintf("test-namespace/%s", name)),
Annotations: map[string]string{
ProvisioningRequestPodAnnotationKey: prName,
ProvisioningClassPodAnnotationKey: testProvisioningClassName,
},
Labels: map[string]string{},
Finalizers: []string{},
OwnerReferences: []metav1.OwnerReference{
{
Controller: proto.Bool(true),
Name: prName,
},
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
Image: containerImage,
},
},
},
}
}
tests := []struct {
desc string
pr *v1beta1.ProvisioningRequest
podTemplates []*apiv1.PodTemplate
want []*v1.Pod
wantErr bool
}{
{
desc: "simple ProvReq",
pr: &v1beta1.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pr-name",
Namespace: "test-namespace",
},
Spec: v1beta1.ProvisioningRequestSpec{
PodSets: []v1beta1.PodSet{
{
Count: 1,
PodTemplateRef: v1beta1.Reference{Name: "template-1"},
},
},
ProvisioningClassName: testProvisioningClassName,
},
},
podTemplates: []*apiv1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "template-1",
Namespace: "test-namespace",
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
},
},
},
want: []*v1.Pod{
testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"),
},
},
{
desc: "ProvReq already having taint",
pr: &v1beta1.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pr-name",
Namespace: "test-namespace",
},
Spec: v1beta1.ProvisioningRequestSpec{
PodSets: []v1beta1.PodSet{
{
Count: 1,
PodTemplateRef: v1beta1.Reference{Name: "template-1"},
},
},
ProvisioningClassName: testProvisioningClassName,
},
},
podTemplates: []*apiv1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "template-1",
Namespace: "test-namespace",
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
},
},
},
want: []*v1.Pod{
testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"),
},
},
{
desc: "ProvReq already having nodeSelector",
pr: &v1beta1.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pr-name",
Namespace: "test-namespace",
},
Spec: v1beta1.ProvisioningRequestSpec{
PodSets: []v1beta1.PodSet{
{
Count: 1,
PodTemplateRef: v1beta1.Reference{Name: "template-1"},
},
},
ProvisioningClassName: testProvisioningClassName,
},
},
podTemplates: []*v1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "template-1",
Namespace: "test-namespace",
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
},
},
},
want: []*v1.Pod{
testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"),
},
},
{
desc: "ProvReq with multiple pod sets",
pr: &v1beta1.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pr-name",
Namespace: "test-namespace",
},
Spec: v1beta1.ProvisioningRequestSpec{
PodSets: []v1beta1.PodSet{
{
Count: 2,
PodTemplateRef: v1beta1.Reference{Name: "template-1"},
},
{
Count: 3,
PodTemplateRef: v1beta1.Reference{Name: "template-2"},
},
},
ProvisioningClassName: testProvisioningClassName,
},
},
podTemplates: []*v1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: "template-1",
Namespace: "test-namespace",
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "template-2",
Namespace: "test-namespace",
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test-container-2",
Image: "test-image-2",
},
},
},
},
},
},
want: []*v1.Pod{
testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"),
testPod("test-pr-name-0-1", "test-pr-name-", "test-container", "test-image", "test-pr-name"),
testPod("test-pr-name-1-0", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"),
testPod("test-pr-name-1-1", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"),
testPod("test-pr-name-1-2", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"),
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
got, err := PodsForProvisioningRequest(provreqwrapper.NewV1Beta1ProvisioningRequest(tc.pr, tc.podTemplates))
if (err != nil) != tc.wantErr {
t.Errorf("PodsForProvisioningRequest() error = %v, wantErr %v", err, tc.wantErr)
return
}
if diff := cmp.Diff(got, tc.want); diff != "" {
t.Errorf("unexpected response from PodsForProvisioningRequest(), diff (-want +got): %v", diff)
}
})
}
}

View File

@ -30,7 +30,7 @@ func TestFetchPodTemplates(t *testing.T) {
mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2} mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2}
ctx := context.Background() ctx := context.Background()
c, _ := NewFakeProvisioningRequestClient(ctx, t, mockProvisioningRequests...) c := NewFakeProvisioningRequestClient(ctx, t, mockProvisioningRequests...)
got, err := c.FetchPodTemplates(pr1.V1Beta1()) got, err := c.FetchPodTemplates(pr1.V1Beta1())
if err != nil { if err != nil {
t.Errorf("provisioningRequestClient.ProvisioningRequests() error: %v", err) t.Errorf("provisioningRequestClient.ProvisioningRequests() error: %v", err)

View File

@ -35,7 +35,7 @@ import (
) )
// NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests. // NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests.
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClientV1beta1, *FakeProvisioningRequestForceClient) { func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClientV1beta1 {
t.Helper() t.Helper()
provReqClient := fake.NewSimpleClientset() provReqClient := fake.NewSimpleClientset()
podTemplClient := fake_kubernetes.NewSimpleClientset() podTemplClient := fake_kubernetes.NewSimpleClientset()
@ -64,23 +64,9 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...
client: provReqClient, client: provReqClient,
provReqLister: provReqLister, provReqLister: provReqLister,
podTemplLister: podTemplLister, podTemplLister: podTemplLister,
}, &FakeProvisioningRequestForceClient{
client: provReqClient,
} }
} }
// FakeProvisioningRequestForceClient that allows to skip cache.
type FakeProvisioningRequestForceClient struct {
client *fake.Clientset
}
// ProvisioningRequest gets a specific ProvisioningRequest CR, skipping cache.
func (c *FakeProvisioningRequestForceClient) ProvisioningRequest(namespace, name string) (*v1beta1.ProvisioningRequest, error) {
ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout)
defer cancel()
return c.client.AutoscalingV1beta1().ProvisioningRequests(namespace).Get(ctx, name, metav1.GetOptions{})
}
// newFakePodTemplatesLister creates a fake lister for the Pod Templates in the cluster. // newFakePodTemplatesLister creates a fake lister for the Pod Templates in the cluster.
func newFakePodTemplatesLister(t *testing.T, client kubernetes.Interface, channel <-chan struct{}) (v1.PodTemplateLister, error) { func newFakePodTemplatesLister(t *testing.T, client kubernetes.Interface, channel <-chan struct{}) (v1.PodTemplateLister, error) {
t.Helper() t.Helper()

View File

@ -0,0 +1,118 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provreqwrapper
import (
"fmt"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
)
// BuildTestProvisioningRequest builds ProvisioningRequest wrapper.
func BuildTestProvisioningRequest(namespace, name, cpu, memory, gpu string, podCount int32,
antiAffinity bool, creationTimestamp time.Time, provisioningRequestClass string) *ProvisioningRequest {
gpuResource := resource.Quantity{}
tolerations := []apiv1.Toleration{}
if len(gpu) > 0 {
gpuResource = resource.MustParse(gpu)
tolerations = append(tolerations, apiv1.Toleration{Key: "nvidia.com/gpu", Operator: apiv1.TolerationOpExists})
}
affinity := &apiv1.Affinity{}
if antiAffinity {
affinity.PodAntiAffinity = &apiv1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"test-app"},
},
},
},
TopologyKey: "failure-domain.beta.kubernetes.io/zone",
},
},
}
}
return NewV1Beta1ProvisioningRequest(
&v1beta1.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
CreationTimestamp: v1.NewTime(creationTimestamp),
},
Spec: v1beta1.ProvisioningRequestSpec{
ProvisioningClassName: provisioningRequestClass,
PodSets: []v1beta1.PodSet{
{
PodTemplateRef: v1beta1.Reference{Name: fmt.Sprintf("%s-template-name", name)},
Count: podCount,
},
},
},
Status: v1beta1.ProvisioningRequestStatus{
Conditions: []metav1.Condition{},
},
},
[]*apiv1.PodTemplate{
{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-template-name", name),
Namespace: namespace,
CreationTimestamp: v1.NewTime(creationTimestamp),
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "test-app",
},
},
Spec: apiv1.PodSpec{
Tolerations: tolerations,
Affinity: affinity,
Containers: []apiv1.Container{
{
Name: "pi",
Image: "perl",
Command: []string{"/bin/sh"},
Resources: apiv1.ResourceRequirements{
Limits: apiv1.ResourceList{
apiv1.ResourceCPU: resource.MustParse(cpu),
apiv1.ResourceMemory: resource.MustParse(memory),
"nvidia.com/gpu": gpuResource,
},
Requests: apiv1.ResourceList{
apiv1.ResourceCPU: resource.MustParse(cpu),
apiv1.ResourceMemory: resource.MustParse(memory),
"nvidia.com/gpu": gpuResource,
},
},
},
},
},
},
},
})
}

View File

@ -35,11 +35,11 @@ const (
// CreateKubeClient creates kube client based on AutoscalingOptions.KubeClientOptions // CreateKubeClient creates kube client based on AutoscalingOptions.KubeClientOptions
func CreateKubeClient(opts config.KubeClientOptions) kube_client.Interface { func CreateKubeClient(opts config.KubeClientOptions) kube_client.Interface {
return kube_client.NewForConfigOrDie(getKubeConfig(opts)) return kube_client.NewForConfigOrDie(GetKubeConfig(opts))
} }
// getKubeConfig returns the rest config from AutoscalingOptions.KubeClientOptions. // GetKubeConfig returns the rest config from AutoscalingOptions.KubeClientOptions.
func getKubeConfig(opts config.KubeClientOptions) *rest.Config { func GetKubeConfig(opts config.KubeClientOptions) *rest.Config {
var kubeConfig *rest.Config var kubeConfig *rest.Config
var err error var err error