Add ProvisioningRequestProcessor (#6488)

This commit is contained in:
Yaroslava Serdiuk 2024-02-14 15:14:46 +02:00 committed by GitHub
parent 947cd3fbfb
commit 5286b3f770
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 454 additions and 81 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@ -53,6 +54,7 @@ type AutoscalerOptions struct {
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Processors *ca_processors.AutoscalingProcessors
LoopStartNotifier *loopstart.ObserversList
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
@ -84,6 +86,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
opts.LoopStartNotifier,
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
@ -101,6 +104,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions)
}
if opts.LoopStartNotifier == nil {
opts.LoopStartNotifier = loopstart.NewObserversList(nil)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}

View File

@ -47,6 +47,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
@ -90,6 +91,7 @@ type StaticAutoscaler struct {
scaleDownActuator scaledown.Actuator
scaleUpOrchestrator scaleup.Orchestrator
processors *ca_processors.AutoscalingProcessors
loopStartNotifier *loopstart.ObserversList
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
taintConfig taints.TaintConfig
@ -136,6 +138,7 @@ func NewStaticAutoscaler(
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *context.AutoscalingKubeClients,
processors *ca_processors.AutoscalingProcessors,
loopStartNotifier *loopstart.ObserversList,
cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy,
estimatorBuilder estimator.EstimatorBuilder,
@ -205,6 +208,7 @@ func NewStaticAutoscaler(
scaleDownActuator: scaleDownActuator,
scaleUpOrchestrator: scaleUpOrchestrator,
processors: processors,
loopStartNotifier: loopStartNotifier,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
taintConfig: taintConfig,
@ -337,6 +341,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
a.loopStartNotifier.Refresh()
// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
maxNodesCount := 0

View File

@ -42,6 +42,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
@ -281,6 +282,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -374,6 +376,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
@ -573,6 +576,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
@ -798,6 +802,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
initialized: true,
}
@ -948,6 +953,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -1096,6 +1102,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -1224,6 +1231,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T)
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -1322,6 +1330,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -1427,6 +1436,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: NewTestProcessors(&context),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}
@ -2023,6 +2033,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
scaleDownActuator: actuator,
scaleDownPlanner: planner,
processors: NewTestProcessors(&ctx),
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
@ -49,6 +50,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
@ -469,15 +471,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
@ -487,14 +480,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
ScaleUpOrchestrator: orchestrator.New(),
}
opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
if err != nil {
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}

View File

@ -0,0 +1,40 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package loopstart
// Observer interface is used to store object that needed to be refreshed in each CA loop.
// It returns error and a bool value whether the loop should be skipped.
type Observer interface {
Refresh()
}
// ObserversList interface is used to store objects that needed to be refreshed in each CA loop.
type ObserversList struct {
observers []Observer
}
// Refresh refresh observers each CA loop.
func (l *ObserversList) Refresh() {
for _, observer := range l.observers {
observer.Refresh()
}
}
// NewObserversList return new ObserversList.
func NewObserversList(observers []Observer) *ObserversList {
return &ObserversList{observers}
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provreq
import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)
// ProvisioningRequestProcessor process ProvisioningRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest)
CleanUp()
}
type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}
// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
// every CA loop and updating conditions for expired ProvisioningRequests.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
}
// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
}
// Refresh iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Refresh() {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
return
}
for _, p := range cp.processors {
p.Process(provReqs)
}
}
// CleanUp cleans up internal state
func (cp *CombinedProvReqProcessor) CleanUp() {}

View File

@ -175,11 +175,8 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// CapacityFound indicates that all of the requested resources were
// fount in the cluster.
CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the

View File

@ -17,19 +17,12 @@ limitations under the License.
package checkcapacity
import (
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)
const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)
const (
//CapacityIsNotFoundReason is added when capacity was not found in the cluster.
CapacityIsNotFoundReason = "CapacityIsNotFound"
@ -37,6 +30,14 @@ const (
CapacityIsFoundReason = "CapacityIsFound"
//FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster.
FailedToBookCapacityReason = "FailedToBookCapacity"
//CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired.
CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired"
//CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired.
CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired"
//ExpiredReason is added if ProvisioningRequest is expired.
ExpiredReason = "Expired"
//ExpiredMsg is added if ProvisioningRequest is expired.
ExpiredMsg = "ProvisioningRequest is expired"
)
func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
for _, condition := range pr.Conditions() {
if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) {
return false
} else if checkConditionType(condition, v1beta1.CapacityFound) {
} else if checkConditionType(condition, v1beta1.Provisioned) {
book = true
}
}
return book
}
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) {
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) {
var newConditions []v1.Condition
newCondition := v1.Condition{
Type: conditionType,
Status: conditionStatus,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
LastTransitionTime: now,
Reason: reason,
Message: message,
}
prevConditions := pr.Conditions()
switch conditionType {
case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed:
case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed:
conditionFound := false
for _, condition := range prevConditions {
if condition.Type == conditionType {

View File

@ -34,7 +34,7 @@ func TestBookCapacity(t *testing.T) {
name: "BookingExpired",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
@ -48,7 +48,7 @@ func TestBookCapacity(t *testing.T) {
name: "Failed",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
@ -66,7 +66,7 @@ func TestBookCapacity(t *testing.T) {
name: "Capacity found and provisioned",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
@ -80,7 +80,7 @@ func TestBookCapacity(t *testing.T) {
name: "Capacity is not found",
prConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionFalse,
},
},
@ -115,29 +115,29 @@ func TestSetCondition(t *testing.T) {
want []v1.Condition
}{
{
name: "CapacityFound added, empty conditions before",
newType: v1beta1.CapacityFound,
name: "Provisioned added, empty conditions before",
newType: v1beta1.Provisioned,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
},
{
name: "CapacityFound updated",
name: "Provisioned updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionFalse,
},
},
newType: v1beta1.CapacityFound,
newType: v1beta1.Provisioned,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -146,7 +146,7 @@ func TestSetCondition(t *testing.T) {
name: "Failed added, non-empty conditions before",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -154,7 +154,7 @@ func TestSetCondition(t *testing.T) {
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
@ -163,28 +163,11 @@ func TestSetCondition(t *testing.T) {
},
},
},
{
name: "Provisioned condition type, conditions are not updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
newType: v1beta1.Provisioned,
newStatus: v1.ConditionFalse,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Status: v1.ConditionTrue,
},
},
},
{
name: "Unknown condition status, conditions are updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -192,7 +175,7 @@ func TestSetCondition(t *testing.T) {
newStatus: v1.ConditionUnknown,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
{
@ -205,7 +188,7 @@ func TestSetCondition(t *testing.T) {
name: "Unknown condition type, conditions are not updated",
oldConditions: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -213,7 +196,7 @@ func TestSetCondition(t *testing.T) {
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -233,19 +216,19 @@ func TestSetCondition(t *testing.T) {
name: "Capacity found with unknown condition before",
oldConditions: []v1.Condition{
{
Type: v1beta1.Provisioned,
Type: "unknown",
Status: v1.ConditionTrue,
},
},
newType: v1beta1.CapacityFound,
newType: v1beta1.Provisioned,
newStatus: v1.ConditionTrue,
want: []v1.Condition{
{
Type: v1beta1.Provisioned,
Type: "unknown",
Status: v1.ConditionTrue,
},
{
Type: v1beta1.CapacityFound,
Type: v1beta1.Provisioned,
Status: v1.ConditionTrue,
},
},
@ -259,7 +242,7 @@ func TestSetCondition(t *testing.T) {
Conditions: test.oldConditions,
},
}, nil)
setCondition(pr, test.newType, test.newStatus, "", "")
setCondition(pr, test.newType, test.newStatus, "", "", v1.Now())
got := pr.Conditions()
if len(got) > 2 || len(got) != len(test.want) || got[0].Type != test.want[0].Type || got[0].Status != test.want[0].Status {
t.Errorf("want %v, got: %v", test.want, got)

View File

@ -127,7 +127,7 @@ func (o *provReqOrchestrator) bookCapacity() error {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err))
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
continue
}
podsToCreate = append(podsToCreate, pods...)
@ -151,10 +151,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err
}
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.")
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
}
setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.")
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now())
return true, nil
}

View File

@ -56,7 +56,7 @@ func TestScaleUp(t *testing.T) {
newCpuProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newCpuProvReq", "5m", "5", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
newMemProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newMemProvReq", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
bookedCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.CapacityFound, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}})
bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.Provisioned, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}})
expiredProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
expiredProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.BookingExpired, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}})
differentProvReqClass := provreqwrapper.BuildTestProvisioningRequest("ns", "differentProvReqClass", "1", "1", "", int32(5), false, time.Now(), v1beta1.ProvisioningClassAtomicScaleUp)

View File

@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"time"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)
const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
// defaultMaxUpdated is a limit for ProvisioningRequest to update conditions in one ClusterAutoscaler loop.
defaultMaxUpdated = 20
)
type checkCapacityProcessor struct {
now func() time.Time
maxUpdated int
}
// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass.
func NewCheckCapacityProcessor() *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated}
}
// Process iterates over ProvisioningRequests and apply:
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
if len(expiredProvReq) >= p.maxUpdated {
break
}
conditions := provReq.Conditions()
if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity ||
apimeta.IsStatusConditionTrue(conditions, v1beta1.BookingExpired) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) {
continue
}
provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)
if provisioned != nil && provisioned.Status == metav1.ConditionTrue {
if provisioned.LastTransitionTime.Add(defaultReservationTime).Before(p.now()) {
expiredProvReq = append(expiredProvReq, provReq)
}
} else if len(failedProvReq) < p.maxUpdated-len(expiredProvReq) {
created := provReq.CreationTimestamp()
if created.Add(defaultExpirationTime).Before(p.now()) {
failedProvReq = append(failedProvReq, provReq)
}
}
}
updated := 0
for _, provReq := range expiredProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
}
// Cleanup cleans up internal state.
func (p *checkCapacityProcessor) CleanUp() {}

View File

@ -0,0 +1,165 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkcapacity
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)
func TestProcess(t *testing.T) {
now := time.Now()
dayAgo := now.Add(-1 * 24 * time.Hour)
weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute)
testCases := []struct {
name string
creationTime time.Time
conditions []metav1.Condition
wantConditions []metav1.Condition
}{
{
name: "New ProvisioningRequest, empty conditions",
creationTime: now,
},
{
name: "ProvisioningRequest with empty conditions, expired",
creationTime: weekAgo,
wantConditions: []metav1.Condition{
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
},
},
{
name: "ProvisioningRequest wasn't provisioned, expired",
creationTime: weekAgo,
conditions: []metav1.Condition{
{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
},
wantConditions: []metav1.Condition{
{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
},
},
{
name: "BookingCapacity time is expired ",
creationTime: dayAgo,
conditions: []metav1.Condition{
{
Type: v1beta1.Provisioned,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
},
wantConditions: []metav1.Condition{
{
Type: v1beta1.Provisioned,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
{
Type: v1beta1.BookingExpired,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: CapacityReservationTimeExpiredReason,
Message: CapacityReservationTimeExpiredMsg,
},
},
},
{
name: "Failed ProvisioningRequest",
creationTime: dayAgo,
conditions: []metav1.Condition{
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: "Failed",
Message: "Failed",
},
},
wantConditions: []metav1.Condition{
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(dayAgo),
Reason: "Failed",
Message: "Failed",
},
},
},
}
for _, test := range testCases {
pr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "name-1")
pr.V1Beta1().Status.Conditions = test.conditions
pr.V1Beta1().CreationTimestamp = metav1.NewTime(test.creationTime)
pr.V1Beta1().Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.V1Beta1().CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.V1Beta1().Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := checkCapacityProcessor{func() time.Time { return now }, 1}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Conditions())
if len(test.conditions) == len(test.wantConditions) {
assert.ElementsMatch(t, []metav1.Condition{
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
}, additionalPr.Conditions())
} else {
assert.ElementsMatch(t, []metav1.Condition{}, additionalPr.Conditions())
}
}
}

View File

@ -41,15 +41,15 @@ const (
provisioningRequestClientCallTimeout = 4 * time.Second
)
// ProvisioningRequestClientV1beta1 represents client for v1beta1 ProvReq CRD.
type ProvisioningRequestClientV1beta1 struct {
// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD.
type ProvisioningRequestClient struct {
client versioned.Interface
provReqLister listers.ProvisioningRequestLister
podTemplLister v1.PodTemplateLister
}
// NewProvisioningRequestClient configures and returns a provisioningRequestClient.
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClientV1beta1, error) {
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) {
prClient, err := newPRClient(kubeConfig)
if err != nil {
return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err)
@ -70,7 +70,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest
return nil, err
}
return &ProvisioningRequestClientV1beta1{
return &ProvisioningRequestClient{
client: prClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,
@ -78,7 +78,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest
}
// ProvisioningRequest gets a specific ProvisioningRequest CR.
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name)
if err != nil {
return nil, err
@ -91,7 +91,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name s
}
// ProvisioningRequests gets all ProvisioningRequest CRs.
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PRs, err := c.provReqLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error fetching provisioningRequests: %w", err)
@ -108,7 +108,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwra
}
// FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request.
func (c *ProvisioningRequestClientV1beta1) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets))
for _, podSpec := range pr.Spec.PodSets {
podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name)

View File

@ -25,8 +25,8 @@ import (
)
func TestFetchPodTemplates(t *testing.T) {
pr1 := provisioningRequestBetaForTests("namespace", "name-1")
pr2 := provisioningRequestBetaForTests("namespace", "name-2")
pr1 := ProvisioningRequestWrapperForTesting("namespace", "name-1")
pr2 := ProvisioningRequestWrapperForTesting("namespace", "name-2")
mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2}
ctx := context.Background()

View File

@ -35,7 +35,7 @@ import (
)
// NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests.
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClientV1beta1 {
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClient {
t.Helper()
provReqClient := fake.NewSimpleClientset()
podTemplClient := fake_kubernetes.NewSimpleClientset()
@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...
if err != nil {
t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err)
}
return &ProvisioningRequestClientV1beta1{
return &ProvisioningRequestClient{
client: provReqClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,
@ -83,7 +83,8 @@ func newFakePodTemplatesLister(t *testing.T, client kubernetes.Interface, channe
return podTemplLister, nil
}
func provisioningRequestBetaForTests(namespace, name string) *provreqwrapper.ProvisioningRequest {
// ProvisioningRequestWrapperForTesting mock ProvisioningRequest for tests.
func ProvisioningRequestWrapperForTesting(namespace, name string) *provreqwrapper.ProvisioningRequest {
if namespace == "" {
namespace = "default"
}