CA: plumb the DRA provider to SetClusterState callsites, grab and pass DRA snapshot
The new logic is flag-guarded, it should be a no-op if DRA is disabled.
This commit is contained in:
parent
c5cb8a077d
commit
55388f1136
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
|
||||
|
|
@ -63,6 +64,7 @@ type AutoscalerOptions struct {
|
|||
ScaleUpOrchestrator scaleup.Orchestrator
|
||||
DeleteOptions options.NodeDeleteOptions
|
||||
DrainabilityRules rules.Rules
|
||||
DraProvider *draprovider.Provider
|
||||
}
|
||||
|
||||
// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
|
||||
|
|
@ -102,6 +104,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
|
|||
opts.ScaleUpOrchestrator,
|
||||
opts.DeleteOptions,
|
||||
opts.DrainabilityRules,
|
||||
opts.DraProvider,
|
||||
), nil
|
||||
}
|
||||
|
||||
|
|
@ -165,6 +168,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
|
|||
if opts.DrainabilityRules == nil {
|
||||
opts.DrainabilityRules = rules.Default(opts.DeleteOptions)
|
||||
}
|
||||
if opts.DraProvider == nil && opts.DynamicResourceAllocationEnabled {
|
||||
opts.DraProvider = draprovider.NewProviderFromInformers(informerFactory)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
|
||||
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
|
||||
|
|
@ -64,6 +65,7 @@ type Actuator struct {
|
|||
configGetter actuatorNodeGroupConfigGetter
|
||||
nodeDeleteDelayAfterTaint time.Duration
|
||||
pastLatencies *expiring.List
|
||||
draProvider *draprovider.Provider
|
||||
}
|
||||
|
||||
// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
|
||||
|
|
@ -74,7 +76,7 @@ type actuatorNodeGroupConfigGetter interface {
|
|||
}
|
||||
|
||||
// NewActuator returns a new instance of Actuator.
|
||||
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
|
||||
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter, draProvider *draprovider.Provider) *Actuator {
|
||||
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
|
||||
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
|
||||
var evictor Evictor
|
||||
|
|
@ -93,6 +95,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
|
|||
configGetter: configGetter,
|
||||
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
|
||||
pastLatencies: expiring.NewList(),
|
||||
draProvider: draProvider,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -368,7 +371,15 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
|
|||
scheduledPods := kube_util.ScheduledPods(pods)
|
||||
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
|
||||
|
||||
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
|
||||
var draSnapshot drasnapshot.Snapshot
|
||||
if a.ctx.DynamicResourceAllocationEnabled && a.draProvider != nil {
|
||||
draSnapshot, err = a.draProvider.Snapshot()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ import (
|
|||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
|
||||
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
|
||||
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
|
||||
|
|
@ -92,6 +93,7 @@ type StaticAutoscaler struct {
|
|||
processorCallbacks *staticAutoscalerProcessorCallbacks
|
||||
initialized bool
|
||||
taintConfig taints.TaintConfig
|
||||
draProvider *draprovider.Provider
|
||||
}
|
||||
|
||||
type staticAutoscalerProcessorCallbacks struct {
|
||||
|
|
@ -144,7 +146,8 @@ func NewStaticAutoscaler(
|
|||
remainingPdbTracker pdb.RemainingPdbTracker,
|
||||
scaleUpOrchestrator scaleup.Orchestrator,
|
||||
deleteOptions options.NodeDeleteOptions,
|
||||
drainabilityRules rules.Rules) *StaticAutoscaler {
|
||||
drainabilityRules rules.Rules,
|
||||
draProvider *draprovider.Provider) *StaticAutoscaler {
|
||||
|
||||
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
|
||||
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
|
||||
|
|
@ -174,7 +177,7 @@ func NewStaticAutoscaler(
|
|||
processorCallbacks.scaleDownPlanner = scaleDownPlanner
|
||||
|
||||
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
|
||||
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
|
||||
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor, draProvider)
|
||||
autoscalingContext.ScaleDownActuator = scaleDownActuator
|
||||
|
||||
if scaleUpOrchestrator == nil {
|
||||
|
|
@ -198,6 +201,7 @@ func NewStaticAutoscaler(
|
|||
processorCallbacks: processorCallbacks,
|
||||
clusterStateRegistry: clusterStateRegistry,
|
||||
taintConfig: taintConfig,
|
||||
draProvider: draProvider,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -337,8 +341,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
|
|||
metrics.UpdateMaxNodesCount(maxNodesCount)
|
||||
}
|
||||
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
|
||||
// Initialize cluster state to ClusterSnapshot
|
||||
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil {
|
||||
|
||||
var draSnapshot drasnapshot.Snapshot
|
||||
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.draProvider != nil {
|
||||
draSnapshot, err = a.draProvider.Snapshot()
|
||||
if err != nil {
|
||||
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
|
||||
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
|
||||
}
|
||||
// Initialize Pod Disruption Budget tracking
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {
|
|||
|
||||
func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
|
||||
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
|
||||
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor)
|
||||
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor, nil)
|
||||
}
|
||||
|
||||
type nodeGroup struct {
|
||||
|
|
@ -1450,7 +1450,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
|
|||
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
|
||||
|
||||
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
|
||||
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor)
|
||||
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor, nil)
|
||||
context.ScaleDownActuator = sdActuator
|
||||
|
||||
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
|
||||
|
|
@ -2097,7 +2097,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
|
|||
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)
|
||||
|
||||
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
|
||||
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor)
|
||||
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor, nil)
|
||||
ctx.ScaleDownActuator = actuator
|
||||
|
||||
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
|
||||
|
|
@ -2669,7 +2669,7 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce
|
|||
nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
|
||||
}
|
||||
planner := planner.New(ctx, p, deleteOptions, nil)
|
||||
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor)
|
||||
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor, nil)
|
||||
return planner, actuator
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue