CA: extend ClusterSnapshotStore to allow storing, retrieving and modifying DRA objects

A new DRA Snapshot type is introduced, for now with just dummy methods
to be implemented in later commits. The new type is intended to hold all
DRA objects in the cluster.

ClusterSnapshotStore.SetClusterState() is extended to take the new DRA Snapshot in
addition to the existing parameters.

ClusterSnapshotStore.DraSnapshot() is added to retrieve the DRA snapshot set by
SetClusterState() back. This will be used by PredicateSnapshot to implement DRA
logic later.

This should be a no-op, as DraSnapshot() is never called, and no DRA
snapshot is passed to SetClusterState() yet.
This commit is contained in:
Kuba Tużnik 2024-11-21 16:50:23 +01:00
parent 1e560274d5
commit 466f94b780
15 changed files with 119 additions and 40 deletions

View File

@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
@ -110,7 +111,7 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil)
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
pods, err := processor.Process(&context.AutoscalingContext{

View File

@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -280,7 +281,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@ -367,7 +368,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
if err != nil {
return nil, err
}

View File

@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -1044,7 +1045,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
// build orchestrator
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods))
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@ -1154,7 +1155,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@ -1197,7 +1198,7 @@ func TestBinpackingLimiter(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@ -1257,7 +1258,7 @@ func TestScaleUpNoHelp(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@ -1412,7 +1413,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@ -1477,7 +1478,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, podList)
err = context.ClusterSnapshot.SetClusterState(nodes, podList, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := processorstest.NewTestProcessors(&context)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@ -71,7 +72,7 @@ func TestDeltaForNode(t *testing.T) {
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
@ -114,7 +115,7 @@ func TestResourcesLeft(t *testing.T) {
ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
@ -167,7 +168,7 @@ func TestApplyLimits(t *testing.T) {
ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
@ -234,7 +235,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
assert.NoError(t, err)
nodes := []*corev1.Node{n1}
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

View File

@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
@ -337,7 +338,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking

View File

@ -27,6 +27,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@ -77,7 +78,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
ctx := context.AutoscalingContext{
@ -163,7 +164,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
nodes := []*apiv1.Node{unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
// Fill cache
@ -254,7 +255,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {
nodes := []*apiv1.Node{ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
ctx := context.AutoscalingContext{

View File

@ -20,6 +20,7 @@ import (
"errors"
apiv1 "k8s.io/api/core/v1"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
)
@ -60,7 +61,7 @@ type ClusterSnapshotStore interface {
// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot without checking scheduler predicates.
ForceAddPod(pod *apiv1.Pod, nodeName string) error
@ -80,6 +81,9 @@ type ClusterSnapshotStore interface {
// ListNodeInfos returns internal NodeInfos for all Nodes tracked in the snapshot. See the comment on GetNodeInfo.
ListNodeInfos() ([]*framework.NodeInfo, error)
// DraSnapshot returns an interface that allows accessing and modifying the DRA objects in the snapshot.
DraSnapshot() drasnapshot.Snapshot
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert().
// Use WithForkedSnapshot() helper function instead if possible.
Fork()

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
@ -39,7 +40,7 @@ func BenchmarkAddNodeInfo(b *testing.B) {
b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil))
assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
b.StartTimer()
for _, node := range nodes {
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
@ -61,7 +62,7 @@ func BenchmarkListNodeInfos(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, nil)
err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
if err != nil {
assert.NoError(b, err)
}
@ -91,14 +92,14 @@ func BenchmarkAddPods(b *testing.B) {
clustersnapshot.AssignTestPodsToNodes(pods, nodes)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, nil)
err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
err = clusterSnapshot.SetClusterState(nodes, nil)
err = clusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
if err != nil {
assert.NoError(b, err)
}
@ -127,7 +128,7 @@ func BenchmarkForkAddRevert(b *testing.B) {
clustersnapshot.AssignTestPodsToNodes(pods, nodes)
clusterSnapshot, err := snapshotFactory()
assert.NoError(b, err)
err = clusterSnapshot.SetClusterState(nodes, pods)
err = clusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
assert.NoError(b, err)
tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000)
tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000)

View File

@ -25,6 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -90,7 +91,7 @@ func getSnapshotState(t *testing.T, snapshot clustersnapshot.ClusterSnapshot) sn
func startSnapshot(t *testing.T, snapshotFactory func() (clustersnapshot.ClusterSnapshot, error), state snapshotState) clustersnapshot.ClusterSnapshot {
snapshot, err := snapshotFactory()
assert.NoError(t, err)
err = snapshot.SetClusterState(state.nodes, state.pods)
err = snapshot.SetClusterState(state.nodes, state.pods, drasnapshot.Snapshot{})
assert.NoError(t, err)
return snapshot
}
@ -324,7 +325,7 @@ func TestSetClusterState(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, state)
compareStates(t, state, getSnapshotState(t, snapshot))
assert.NoError(t, snapshot.SetClusterState(nil, nil))
assert.NoError(t, snapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
compareStates(t, snapshotState{}, getSnapshotState(t, snapshot))
})
@ -335,7 +336,7 @@ func TestSetClusterState(t *testing.T) {
newNodes, newPods := clustersnapshot.CreateTestNodes(13), clustersnapshot.CreateTestPods(37)
clustersnapshot.AssignTestPodsToNodes(newPods, newNodes)
assert.NoError(t, snapshot.SetClusterState(newNodes, newPods))
assert.NoError(t, snapshot.SetClusterState(newNodes, newPods, drasnapshot.Snapshot{}))
compareStates(t, snapshotState{nodes: newNodes, pods: newPods}, getSnapshotState(t, snapshot))
})
@ -358,7 +359,7 @@ func TestSetClusterState(t *testing.T) {
compareStates(t, snapshotState{allNodes, allPods}, getSnapshotState(t, snapshot))
assert.NoError(t, snapshot.SetClusterState(nil, nil))
assert.NoError(t, snapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
compareStates(t, snapshotState{}, getSnapshotState(t, snapshot))
@ -754,7 +755,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)
assert.NoError(t, snapshot.SetClusterState(nil, nil))
assert.NoError(t, snapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, false, volumeExists)

View File

@ -21,6 +21,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -205,6 +206,12 @@ func (snapshot *BasicSnapshotStore) getInternalData() *internalBasicSnapshotData
return snapshot.data[len(snapshot.data)-1]
}
// DraSnapshot returns the DRA snapshot.
func (snapshot *BasicSnapshotStore) DraSnapshot() drasnapshot.Snapshot {
// TODO(DRA): Return DRA snapshot.
return drasnapshot.Snapshot{}
}
// GetNodeInfo gets a NodeInfo.
func (snapshot *BasicSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getInternalData().getNodeInfo(nodeName)
@ -238,7 +245,7 @@ func (snapshot *BasicSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) er
}
// SetClusterState sets the cluster state.
func (snapshot *BasicSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error {
func (snapshot *BasicSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
snapshot.clear()
knownNodes := make(map[string]bool)
@ -255,6 +262,7 @@ func (snapshot *BasicSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
}
}
}
// TODO(DRA): Save DRA snapshot.
return nil
}
@ -325,17 +333,17 @@ func (snapshot *BasicSnapshotStore) StorageInfos() schedulerframework.StorageInf
// ResourceClaims exposes snapshot as ResourceClaimTracker
func (snapshot *BasicSnapshotStore) ResourceClaims() schedulerframework.ResourceClaimTracker {
return nil
return snapshot.DraSnapshot().ResourceClaims()
}
// ResourceSlices exposes snapshot as ResourceSliceLister.
func (snapshot *BasicSnapshotStore) ResourceSlices() schedulerframework.ResourceSliceLister {
return nil
return snapshot.DraSnapshot().ResourceSlices()
}
// DeviceClasses exposes the snapshot as DeviceClassLister.
func (snapshot *BasicSnapshotStore) DeviceClasses() schedulerframework.DeviceClassLister {
return nil
return snapshot.DraSnapshot().DeviceClasses()
}
// List returns the list of nodes in the snapshot.

View File

@ -21,6 +21,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -389,17 +390,17 @@ func (snapshot *DeltaSnapshotStore) StorageInfos() schedulerframework.StorageInf
// ResourceClaims exposes snapshot as ResourceClaimTracker
func (snapshot *DeltaSnapshotStore) ResourceClaims() schedulerframework.ResourceClaimTracker {
return nil
return snapshot.DraSnapshot().ResourceClaims()
}
// ResourceSlices exposes snapshot as ResourceSliceLister.
func (snapshot *DeltaSnapshotStore) ResourceSlices() schedulerframework.ResourceSliceLister {
return nil
return snapshot.DraSnapshot().ResourceSlices()
}
// DeviceClasses exposes the snapshot as DeviceClassLister.
func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceClassLister {
return nil
return snapshot.DraSnapshot().DeviceClasses()
}
// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
@ -409,6 +410,12 @@ func NewDeltaSnapshotStore() *DeltaSnapshotStore {
return snapshot
}
// DraSnapshot returns the DRA snapshot.
func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {
// TODO(DRA): Return DRA snapshot.
return drasnapshot.Snapshot{}
}
// GetNodeInfo gets a NodeInfo.
func (snapshot *DeltaSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getNodeInfo(nodeName)
@ -442,7 +449,7 @@ func (snapshot *DeltaSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) er
}
// SetClusterState sets the cluster state.
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error {
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
snapshot.clear()
knownNodes := make(map[string]bool)
@ -459,6 +466,7 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
}
}
}
// TODO(DRA): Save DRA snapshot.
return nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)
@ -48,7 +49,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000)
deltaStore := NewDeltaSnapshotStore()
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil); err != nil {
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
deltaStore.Fork()
@ -68,7 +69,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount)
deltaStore := NewDeltaSnapshotStore()
if err := deltaStore.SetClusterState(nodes, nil); err != nil {
if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
b.ResetTimer()

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
@ -38,7 +39,7 @@ func InitializeClusterSnapshotOrDie(
pods []*apiv1.Pod) {
var err error
assert.NoError(t, snapshot.SetClusterState(nil, nil))
assert.NoError(t, snapshot.SetClusterState(nil, nil, drasnapshot.Snapshot{}))
for _, node := range nodes {
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))

View File

@ -0,0 +1,48 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// Snapshot contains a snapshot of all DRA objects taken at a ~single point in time.
type Snapshot struct {
}
// ResourceClaims exposes the Snapshot as schedulerframework.ResourceClaimTracker, in order to interact with
// the scheduler framework.
func (s Snapshot) ResourceClaims() schedulerframework.ResourceClaimTracker {
return nil
}
// ResourceSlices exposes the Snapshot as schedulerframework.ResourceSliceLister, in order to interact with
// the scheduler framework.
func (s Snapshot) ResourceSlices() schedulerframework.ResourceSliceLister {
return nil
}
// DeviceClasses exposes the Snapshot as schedulerframework.DeviceClassLister, in order to interact with
// the scheduler framework.
func (s Snapshot) DeviceClasses() schedulerframework.DeviceClassLister {
return nil
}
// Clone returns a copy of this Snapshot that can be independently modified without affecting this Snapshot.
func (s Snapshot) Clone() Snapshot {
return Snapshot{}
}