CA: rename ClusterSnapshot AddPod, RemovePod, RemoveNode

RemoveNode is renamed to RemoveNodeInfo for consistency with other
NodeInfo methods.

For DRA, the snapshot will have to potentially allocate ResourceClaims
when adding a Pod to a Node, and deallocate them when removing a Pod
from a Node. This will happen in new methods added to ClusterSnapshot
in later commits - SchedulePod and UnschedulePod. These new methods
should be the "default" way of moving pods around the snapshot going
forward.

However, we'll still need to be able to add and remove pods from the
snapshot "forcefully" to handle some corner cases (e.g. expendable pods).
AddPod is renamed to ForceAddPod, and RemovePod to ForceRemovePod to
highlight that these are no longer the "default" methods of moving pods
around the snapshot, and are bypassing something important.
This commit is contained in:
Kuba Tużnik 2024-10-01 12:20:36 +02:00
parent a81aa5c616
commit f67db627e2
11 changed files with 58 additions and 56 deletions

View File

@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)
type filterOutExpendable struct {
@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}

View File

@ -464,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
@ -660,16 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
nodeGroup := nodeGroups[nodeGroupName]
if nodeGroup == nil {
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
}
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
for _, upcomingNode := range upcomingNodes {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNode)
for _, upcomingNodeInfo := range upcomingNodeInfos {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
if err != nil {
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
}
if isUpcomingNodeGroup {
upcomingNodesFromUpcomingNodeGroups++

View File

@ -25,7 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)
// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
@ -225,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true

View File

@ -32,7 +32,7 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)
// NodeToBeRemoved contain information about a node that can be removed.
@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n
// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}

View File

@ -153,7 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
return nil
}
func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return ErrNodeNotFound
}
@ -253,18 +253,18 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
return nil
}
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.getInternalData().removeNodeInfo(nodeName)
}
// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
// ForceAddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.getInternalData().addPod(pod, nodeName)
}
// RemovePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
// ForceRemovePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
return snapshot.getInternalData().removePod(namespace, podName, nodeName)
}

View File

@ -34,16 +34,17 @@ type ClusterSnapshot interface {
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
AddPod(pod *apiv1.Pod, nodeName string) error
// RemovePod removes pod from the snapshot.
RemovePod(namespace string, podName string, nodeName string) error
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot.
ForceAddPod(pod *apiv1.Pod, nodeName string) error
// ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot.
ForceRemovePod(namespace string, podName string, nodeName string) error
// AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as
// any DRA objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
// any DRA objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.

View File

@ -133,7 +133,7 @@ func BenchmarkAddPods(b *testing.B) {
err := clusterSnapshot.SetClusterState(nodes, nil)
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
@ -143,7 +143,7 @@ func BenchmarkAddPods(b *testing.B) {
}
b.StartTimer()
for _, pod := range pods {
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName)
if err != nil {
assert.NoError(b, err)
}

View File

@ -115,22 +115,22 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "remove node",
name: "remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
{
name: "remove node, then add it back",
name: "remove nodeInfo, then add it back",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
@ -141,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "add pod, then remove node",
name: "add pod, then remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.AddPod(pod, node.Name)
err := snapshot.ForceAddPod(pod, node.Name)
assert.NoError(t, err)
err = snapshot.RemoveNode(node.Name)
err = snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
@ -326,7 +326,7 @@ func TestClear(t *testing.T) {
}
for _, pod := range extraPods {
err := snapshot.AddPod(pod, pod.Spec.NodeName)
err := snapshot.ForceAddPod(pod, pod.Spec.NodeName)
assert.NoError(t, err)
}
@ -349,17 +349,17 @@ func TestNode404(t *testing.T) {
op func(ClusterSnapshot) error
}{
{"add pod", func(snapshot ClusterSnapshot) error {
return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node")
return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node")
}},
{"remove pod", func(snapshot ClusterSnapshot) error {
return snapshot.RemovePod("default", "p1", "node")
return snapshot.ForceRemovePod("default", "p1", "node")
}},
{"get node", func(snapshot ClusterSnapshot) error {
_, err := snapshot.NodeInfos().Get("node")
return err
}},
{"remove node", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNode("node")
{"remove nodeInfo", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNodeInfo("node")
}},
}
@ -385,7 +385,7 @@ func TestNode404(t *testing.T) {
snapshot.Fork()
assert.NoError(t, err)
err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)
// Node deleted after fork - shouldn't be able to operate on it.
@ -408,7 +408,7 @@ func TestNode404(t *testing.T) {
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)
// Node deleted from base - shouldn't be able to operate on it.
@ -625,7 +625,7 @@ func TestPVCUsedByPods(t *testing.T) {
assert.Equal(t, tc.exists, volumeExists)
if tc.removePod != "" {
err = snapshot.RemovePod("default", tc.removePod, "node")
err = snapshot.ForceRemovePod("default", tc.removePod, "node")
assert.NoError(t, err)
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
@ -698,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)
err = snapshot.AddPod(pod2, "node")
err = snapshot.ForceAddPod(pod2, "node")
assert.NoError(t, err)
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))

View File

@ -177,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() {
data.pvcNamespaceMap = nil
}
func (data *internalDeltaSnapshotData) removeNode(nodeName string) error {
func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error {
_, foundInDelta := data.addedNodeInfoMap[nodeName]
if foundInDelta {
// If node was added within this delta, delete this change.
@ -296,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err
return data, nil
}
for node := range data.deletedNodeInfos {
if err := data.baseData.removeNode(node); err != nil {
if err := data.baseData.removeNodeInfo(node); err != nil {
return nil, err
}
}
for _, node := range data.modifiedNodeInfoMap {
if err := data.baseData.removeNode(node.Node().Name); err != nil {
if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil {
return nil, err
}
if err := data.baseData.addNodeInfo(node); err != nil {
@ -442,18 +442,18 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
return nil
}
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.data.removeNodeInfo(nodeName)
}
// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
// ForceAddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.data.addPod(pod, nodeName)
}
// RemovePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
// ForceRemovePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
return snapshot.data.removePod(namespace, podName, nodeName)
}

View File

@ -20,6 +20,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)
@ -42,10 +43,10 @@ func InitializeClusterSnapshotOrDie(
for _, pod := range pods {
if pod.Spec.NodeName != "" {
err = snapshot.AddPod(pod, pod.Spec.NodeName)
err = snapshot.ForceAddPod(pod, pod.Spec.NodeName)
assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
} else if pod.Status.NominatedNodeName != "" {
err = snapshot.AddPod(pod, pod.Status.NominatedNodeName)
err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName)
assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName)
} else {
assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name)

View File

@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust
if nodeName != "" {
klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName)
if err := clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err)
}
statuses = append(statuses, Status{Pod: pod, NodeName: nodeName})