Add an option to Cluster Autoscaler that allows triggering new loops

more frequently: based on new unschedulable pods and every time a
previous iteration was productive.
This commit is contained in:
Karol Wychowaniec 2024-03-06 11:05:32 +00:00
parent 38124f2692
commit 702883d72f
6 changed files with 220 additions and 18 deletions

View File

@ -72,6 +72,10 @@ type Autoscaler interface {
RunOnce(currentTime time.Time) errors.AutoscalerError
// ExitCleanUp is a clean-up performed just before process termination.
ExitCleanUp()
// LastScaleUpTime is a time of the last scale up
LastScaleUpTime() time.Time
// LastScaleUpTime is a time of the last scale down
LastScaleDownDeleteTime() time.Time
}
// NewAutoscaler creates an autoscaler of an appropriate type according to the parameters

View File

@ -215,6 +215,16 @@ func NewStaticAutoscaler(
}
}
// LastScaleUpTime returns last scale up time
func (a *StaticAutoscaler) LastScaleUpTime() time.Time {
return a.lastScaleUpTime
}
// LastScaleDownDeleteTime returns the last successful scale down time
func (a *StaticAutoscaler) LastScaleDownDeleteTime() time.Time {
return a.lastScaleDownDeleteTime
}
// Start starts components running in background.
func (a *StaticAutoscaler) Start() error {
a.clusterStateRegistry.Start()

View File

@ -0,0 +1,44 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package loop
import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)
type autoscaler interface {
// RunOnce represents an iteration in the control-loop of CA.
RunOnce(currentTime time.Time) errors.AutoscalerError
}
// RunAutoscalerOnce triggers a single autoscaling iteration.
func RunAutoscalerOnce(autoscaler autoscaler, healthCheck *metrics.HealthCheck, loopStart time.Time) {
metrics.UpdateLastTime(metrics.Main, loopStart)
healthCheck.UpdateLastActivity(loopStart)
err := autoscaler.RunOnce(loopStart)
if err != nil && err.Type() != errors.TransientError {
metrics.RegisterError(err)
} else {
healthCheck.UpdateLastSuccessfulRun(time.Now())
}
metrics.UpdateDurationFromStart(metrics.Main, loopStart)
}

View File

@ -0,0 +1,144 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package loop
import (
"context"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
)
const maxPodChangeAge = 10 * time.Second
var (
podsResource = "pods"
unschedulablePodSelector = fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
)
// scalingTimesGetter exposes recent autoscaler activity
type scalingTimesGetter interface {
LastScaleUpTime() time.Time
LastScaleDownDeleteTime() time.Time
}
// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
}
// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
}
}
// Wait waits for the next autoscaling iteration
func (t *LoopTrigger) Wait(lastRun time.Time) {
sleepStart := time.Now()
defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart)
// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof("Autoscaler loop triggered immediately after a productive iteration")
}
return
}
// Unschedulable pod triggers autoscaling immediately.
select {
case <-time.After(t.scanInterval):
klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval)
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
}
}
// UnschedulablePodObserver triggers a new loop if there are new unschedulable pods
type UnschedulablePodObserver struct {
unschedulablePodChan <-chan any
}
// StartPodObserver creates an informer and starts a goroutine watching for newly added
// or updated pods. Each time a new unschedulable pod appears or a change causes a pod to become
// unschedulable, a message is sent to the UnschedulablePodObserver's channel.
func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *UnschedulablePodObserver {
podChan := make(chan any, 1)
listWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), podsResource, apiv1.NamespaceAll, unschedulablePodSelector)
informer := cache.NewSharedInformer(listWatch, &apiv1.Pod{}, time.Hour)
addEventHandlerFunc := func(obj any) {
if isRecentUnschedulablePod(obj) {
klog.V(5).Infof(" filterPodChanUntilClose emits signal")
select {
case podChan <- struct{}{}:
default:
}
}
}
updateEventHandlerFunc := func(old any, newOjb any) { addEventHandlerFunc(newOjb) }
_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: addEventHandlerFunc,
UpdateFunc: updateEventHandlerFunc,
})
go informer.Run(ctx.Done())
return &UnschedulablePodObserver{
unschedulablePodChan: podChan,
}
}
// isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently.
func isRecentUnschedulablePod(obj any) bool {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return false
}
if pod.Status.Phase == apiv1.PodSucceeded || pod.Status.Phase == apiv1.PodFailed {
return false
}
if pod.Spec.NodeName != "" {
return false
}
_, scheduledCondition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if scheduledCondition == nil {
return false
}
if scheduledCondition.Status != apiv1.ConditionFalse || scheduledCondition.Reason != "Unschedulable" {
return false
}
if scheduledCondition.LastTransitionTime.Time.Add(maxPodChangeAge).Before(time.Now()) {
return false
}
return true
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/loop"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
@ -62,7 +63,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -258,6 +258,7 @@ var (
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
)
func isFlagPassed(name string) bool {
@ -591,23 +592,21 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
}
// Autoscale ad infinitum.
for {
select {
case <-time.After(*scanInterval):
{
loopStart := time.Now()
metrics.UpdateLastTime(metrics.Main, loopStart)
healthCheck.UpdateLastActivity(loopStart)
err := autoscaler.RunOnce(loopStart)
if err != nil && err.Type() != errors.TransientError {
metrics.RegisterError(err)
} else {
healthCheck.UpdateLastSuccessfulRun(time.Now())
}
metrics.UpdateDurationFromStart(metrics.Main, loopStart)
}
context, cancel := ctx.WithCancel(ctx.Background())
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
lastRun = time.Now()
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
}
} else {
for {
time.Sleep(*scanInterval)
loop.RunAutoscalerOnce(autoscaler, healthCheck, time.Now())
}
}
}

View File

@ -114,6 +114,7 @@ const (
Poll FunctionLabel = "poll"
Reconfigure FunctionLabel = "reconfigure"
Autoscaling FunctionLabel = "autoscaling"
LoopWait FunctionLabel = "loopWait"
)
var (