Merge pull request #5290 from yaroslava-serdiuk/scale-down

Allow forking snapshot more than 1 time
This commit is contained in:
Kubernetes Prow Robot 2022-11-16 08:14:49 -08:00 committed by GitHub
commit 01389b01e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 166 additions and 112 deletions

View File

@ -116,8 +116,7 @@ func TestFilterOutSchedulableByPacking(t *testing.T) {
filterOutSchedulablePodListProcessor := NewFilterOutSchedulablePodListProcessor()
err = clusterSnapshot.Fork()
assert.NoError(t, err)
clusterSnapshot.Fork()
var expectedPodsInSnapshot = tt.scheduledPods
for _, pod := range tt.expectedFilteredOutPods {
@ -164,10 +163,8 @@ func TestFilterOutSchedulableByPacking(t *testing.T) {
assert.ElementsMatch(t, expectedFilteredOutPodUids, podUidsInHintsMap)
// reset snapshot to initial state and run filterOutSchedulableByPacking with hinting map filled in
err = clusterSnapshot.Revert()
assert.NoError(t, err)
err = clusterSnapshot.Fork()
assert.NoError(t, err)
clusterSnapshot.Revert()
clusterSnapshot.Fork()
stillPendingPods, err = filterOutSchedulablePodListProcessor.filterOutSchedulableByPacking(tt.pendingPods, clusterSnapshot, predicateChecker)
assert.NoError(t, err)

View File

@ -68,10 +68,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
Pods: make([]*apiv1.Pod, 0),
}
if err := context.ClusterSnapshot.Fork(); err != nil {
klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err)
return expander.Option{}, err
}
context.ClusterSnapshot.Fork()
// add test node to snapshot
var pods []*apiv1.Pod
@ -80,12 +77,9 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
}
if err := context.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), pods); err != nil {
klog.Errorf("Error while adding test Node; %v", err)
if err := context.ClusterSnapshot.Revert(); err != nil {
klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err)
}
// TODO: Or should I just skip the node group? specifically if Revert fails it is fatal error.
// Maybe we should not return error from Revert as we cannot handle it in any way on the caller side?
return expander.Option{}, err
context.ClusterSnapshot.Revert()
// TODO: Or should I just skip the node group?
return expander.Option{}, nil
}
for _, eg := range podEquivalenceGroups {
@ -104,10 +98,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
}
}
if err := context.ClusterSnapshot.Revert(); err != nil {
klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err)
return expander.Option{}, err
}
context.ClusterSnapshot.Revert()
if len(option.Pods) > 0 {
estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)

View File

@ -84,9 +84,9 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError {
p.latestUpdate = currentTime
p.actuationStatus = as
// TODO: clone cluster snapshot to avoid persisting changes done by the
// simulation. Or - better yet - allow the snapshot to be forked twice
// and just fork it here.
// Avoid persisting changes done by the simulation.
p.context.ClusterSnapshot.Fork()
defer p.context.ClusterSnapshot.Revert()
err := p.injectOngoingActuation()
if err != nil {
p.CleanUpUnneededNodes()

View File

@ -76,14 +76,9 @@ func (e *BinpackingNodeEstimator) Estimate(
newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)
if err := e.clusterSnapshot.Fork(); err != nil {
klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err)
return 0, nil
}
e.clusterSnapshot.Fork()
defer func() {
if err := e.clusterSnapshot.Revert(); err != nil {
klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err)
}
e.clusterSnapshot.Revert()
}()
newNodeNameIndex := 0

View File

@ -196,9 +196,7 @@ func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp
}
func (r *RemovalSimulator) withForkedSnapshot(f func() error) (err error) {
if err = r.clusterSnapshot.Fork(); err != nil {
return err
}
r.clusterSnapshot.Fork()
defer func() {
if err == nil && r.canPersist {
cleanupErr := r.clusterSnapshot.Commit()
@ -206,10 +204,7 @@ func (r *RemovalSimulator) withForkedSnapshot(f func() error) (err error) {
klog.Fatalf("Got error when calling ClusterSnapshot.Commit(); %v", cleanupErr)
}
} else {
cleanupErr := r.clusterSnapshot.Revert()
if cleanupErr != nil {
klog.Fatalf("Got error when calling ClusterSnapshot.Revert(); %v", cleanupErr)
}
r.clusterSnapshot.Revert()
}
}()
err = f()

View File

@ -26,8 +26,7 @@ import (
// BasicClusterSnapshot is simple, reference implementation of ClusterSnapshot.
// It is inefficient. But hopefully bug-free and good for initial testing.
type BasicClusterSnapshot struct {
baseData *internalBasicSnapshotData
forkedData *internalBasicSnapshotData
data []*internalBasicSnapshotData
}
type internalBasicSnapshotData struct {
@ -208,10 +207,7 @@ func NewBasicClusterSnapshot() *BasicClusterSnapshot {
}
func (snapshot *BasicClusterSnapshot) getInternalData() *internalBasicSnapshotData {
if snapshot.forkedData != nil {
return snapshot.forkedData
}
return snapshot.baseData
return snapshot.data[len(snapshot.data)-1]
}
// AddNode adds node to the snapshot.
@ -258,36 +254,33 @@ func (snapshot *BasicClusterSnapshot) IsPVCUsedByPods(key string) bool {
}
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
func (snapshot *BasicClusterSnapshot) Fork() error {
if snapshot.forkedData != nil {
return fmt.Errorf("snapshot already forked")
}
snapshot.forkedData = snapshot.baseData.clone()
return nil
func (snapshot *BasicClusterSnapshot) Fork() {
forkData := snapshot.getInternalData().clone()
snapshot.data = append(snapshot.data, forkData)
}
// Revert reverts snapshot state to moment of forking.
func (snapshot *BasicClusterSnapshot) Revert() error {
snapshot.forkedData = nil
return nil
func (snapshot *BasicClusterSnapshot) Revert() {
if len(snapshot.data) == 1 {
return
}
snapshot.data = snapshot.data[:len(snapshot.data)-1]
}
// Commit commits changes done after forking.
func (snapshot *BasicClusterSnapshot) Commit() error {
if snapshot.forkedData == nil {
if len(snapshot.data) <= 1 {
// do nothing
return nil
}
snapshot.baseData = snapshot.forkedData
snapshot.forkedData = nil
snapshot.data = append(snapshot.data[:len(snapshot.data)-2], snapshot.data[len(snapshot.data)-1])
return nil
}
// Clear reset cluster snapshot to empty, unforked state
func (snapshot *BasicClusterSnapshot) Clear() {
snapshot.baseData = newInternalBasicSnapshotData()
snapshot.forkedData = nil
baseData := newInternalBasicSnapshotData()
snapshot.data = []*internalBasicSnapshotData{baseData}
}
// implementation of SharedLister interface

View File

@ -20,6 +20,7 @@ import (
"errors"
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -42,15 +43,36 @@ type ClusterSnapshot interface {
// IsPVCUsedByPods returns if the pvc is used by any pod, key = <namespace>/<pvc_name>
IsPVCUsedByPods(key string) bool
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
Fork() error
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert().
// Use WithForkedSnapshot() helper function instead if possible.
Fork()
// Revert reverts snapshot state to moment of forking.
Revert() error
Revert()
// Commit commits changes done after forking.
Commit() error
// Clear reset cluster snapshot to empty, unforked state
// Clear reset cluster snapshot to empty, unforked state.
Clear()
}
var errNodeNotFound = errors.New("node not found")
// WithForkedSnapshot is a helper function for snapshot that makes sure all Fork() calls are closed with Commit() or Revert() calls.
// The function return (error, error) pair. The first error comes from the passed function, the second error indicate the success of the function itself.
func WithForkedSnapshot(snapshot ClusterSnapshot, f func() (bool, error)) (error, error) {
var commit bool
var err, cleanupErr error
snapshot.Fork()
defer func() {
if commit {
cleanupErr = snapshot.Commit()
if cleanupErr != nil {
klog.Errorf("Got error when calling ClusterSnapshot.Commit(), will try to revert; %v", cleanupErr)
}
}
if !commit || cleanupErr != nil {
snapshot.Revert()
}
}()
commit, err = f()
return err, cleanupErr
}

View File

@ -188,22 +188,23 @@ func BenchmarkForkAddRevert(b *testing.B) {
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
assert.NoError(b, err)
}
tmpNode := BuildTestNode("tmp", 2000, 2000000)
tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000)
tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
err = clusterSnapshot.Fork()
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode1)
if err != nil {
assert.NoError(b, err)
}
err = clusterSnapshot.AddNode(tmpNode)
if err != nil {
assert.NoError(b, err)
}
err = clusterSnapshot.Revert()
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode2)
if err != nil {
assert.NoError(b, err)
}
clusterSnapshot.Revert()
clusterSnapshot.Revert()
}
})
}
@ -236,9 +237,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
if err := snapshot.AddNodes(nodes[:tc.nodeCount]); err != nil {
assert.NoError(b, err)
}
if err := snapshot.Fork(); err != nil {
assert.NoError(b, err)
}
snapshot.Fork()
if err := snapshot.AddNodes(nodes[tc.nodeCount:]); err != nil {
assert.NoError(b, err)
}

View File

@ -162,6 +162,7 @@ func validTestCases(t *testing.T) []modificationTestCase {
func TestForking(t *testing.T) {
testCases := validTestCases(t)
node := BuildTestNode("specialNode-2", 10, 100)
for name, snapshotFactory := range snapshots {
for _, tc := range testCases {
@ -175,8 +176,7 @@ func TestForking(t *testing.T) {
t.Run(fmt.Sprintf("%s: %s fork", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
tc.op(snapshot)
@ -186,13 +186,26 @@ func TestForking(t *testing.T) {
t.Run(fmt.Sprintf("%s: %s fork & revert", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
tc.op(snapshot)
err = snapshot.Revert()
assert.NoError(t, err)
snapshot.Revert()
// Modifications should no longer be applied.
compareStates(t, tc.state, getSnapshotState(t, snapshot))
})
t.Run(fmt.Sprintf("%s: %s fork & fork & revert & revert", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
snapshot.Fork()
tc.op(snapshot)
snapshot.Fork()
snapshot.AddNode(node)
snapshot.Revert()
snapshot.Revert()
// Modifications should no longer be applied.
compareStates(t, tc.state, getSnapshotState(t, snapshot))
@ -200,12 +213,42 @@ func TestForking(t *testing.T) {
t.Run(fmt.Sprintf("%s: %s fork & commit", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
tc.op(snapshot)
err = snapshot.Commit()
err := snapshot.Commit()
assert.NoError(t, err)
// Modifications should be applied.
compareStates(t, tc.modifiedState, getSnapshotState(t, snapshot))
})
t.Run(fmt.Sprintf("%s: %s fork & fork & commit & revert", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
snapshot.Fork()
snapshot.Fork()
tc.op(snapshot)
err := snapshot.Commit()
assert.NoError(t, err)
// Modifications should be applied.
compareStates(t, tc.modifiedState, getSnapshotState(t, snapshot))
snapshot.Revert()
// Modifications should no longer be applied.
compareStates(t, tc.state, getSnapshotState(t, snapshot))
})
t.Run(fmt.Sprintf("%s: %s fork & fork & revert & commit", name, tc.name), func(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
snapshot.Fork()
tc.op(snapshot)
snapshot.Fork()
snapshot.AddNode(node)
snapshot.Revert()
err := snapshot.Commit()
assert.NoError(t, err)
// Modifications should be applied.
@ -220,7 +263,7 @@ func TestForking(t *testing.T) {
_, err = snapshot.NodeInfos().HavePodsWithAffinityList()
assert.NoError(t, err)
err = snapshot.Fork()
snapshot.Fork()
assert.NoError(t, err)
tc.op(snapshot)
@ -278,10 +321,9 @@ func TestClear(t *testing.T) {
snapshot := startSnapshot(t, snapshotFactory, state)
compareStates(t, state, getSnapshotState(t, snapshot))
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
err = snapshot.AddNodes(extraNodes)
err := snapshot.AddNodes(extraNodes)
assert.NoError(t, err)
for _, pod := range extraPods {
@ -291,16 +333,12 @@ func TestClear(t *testing.T) {
compareStates(t, snapshotState{allNodes, allPods}, getSnapshotState(t, snapshot))
// Fork()ing twice is not allowed.
err = snapshot.Fork()
assert.Error(t, err)
snapshot.Clear()
compareStates(t, snapshotState{}, getSnapshotState(t, snapshot))
// Clear() should break out of forked state.
err = snapshot.Fork()
snapshot.Fork()
assert.NoError(t, err)
})
}
@ -346,7 +384,7 @@ func TestNode404(t *testing.T) {
err := snapshot.AddNode(node)
assert.NoError(t, err)
err = snapshot.Fork()
snapshot.Fork()
assert.NoError(t, err)
err = snapshot.RemoveNode("node")
@ -421,7 +459,7 @@ func TestNodeAlreadyExists(t *testing.T) {
err := snapshot.AddNode(node)
assert.NoError(t, err)
err = snapshot.Fork()
snapshot.Fork()
assert.NoError(t, err)
// Node already in base, shouldn't be able to add in fork.
@ -433,10 +471,9 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
err = snapshot.AddNode(node)
err := snapshot.AddNode(node)
assert.NoError(t, err)
// Node already in fork.
@ -447,10 +484,9 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()
err := snapshot.Fork()
assert.NoError(t, err)
snapshot.Fork()
err = snapshot.AddNode(node)
err := snapshot.AddNode(node)
assert.NoError(t, err)
err = snapshot.Commit()
@ -662,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)
err = snapshot.Fork()
snapshot.Fork()
assert.NoError(t, err)
volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)
@ -673,8 +709,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))
assert.Equal(t, true, volumeExists)
err = snapshot.Revert()
assert.NoError(t, err)
snapshot.Revert()
volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))
assert.Equal(t, false, volumeExists)
@ -695,3 +730,37 @@ func TestPVCClearAndFork(t *testing.T) {
})
}
}
func TestWithForkedSnapshot(t *testing.T) {
testCases := validTestCases(t)
err := fmt.Errorf("some error")
for name, snapshotFactory := range snapshots {
for _, tc := range testCases {
snapshot := startSnapshot(t, snapshotFactory, tc.state)
successFunc := func() (bool, error) {
tc.op(snapshot)
return true, err
}
failedFunc := func() (bool, error) {
tc.op(snapshot)
return false, err
}
t.Run(fmt.Sprintf("%s: %s WithForkedSnapshot for failed function", name, tc.name), func(t *testing.T) {
err1, err2 := WithForkedSnapshot(snapshot, failedFunc)
assert.Error(t, err1)
assert.NoError(t, err2)
// Modifications should not be applied.
compareStates(t, tc.state, getSnapshotState(t, snapshot))
})
t.Run(fmt.Sprintf("%s: %s WithForkedSnapshot for success function", name, tc.name), func(t *testing.T) {
err1, err2 := WithForkedSnapshot(snapshot, successFunc)
assert.Error(t, err1)
assert.NoError(t, err2)
// Modifications should be applied.
compareStates(t, tc.modifiedState, getSnapshotState(t, snapshot))
})
}
}
}

View File

@ -444,24 +444,17 @@ func (snapshot *DeltaClusterSnapshot) IsPVCUsedByPods(key string) bool {
}
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Forking already forked snapshot is not allowed and will result with an error.
// Time: O(1)
func (snapshot *DeltaClusterSnapshot) Fork() error {
if snapshot.data.baseData != nil {
return fmt.Errorf("snapshot already forked")
}
func (snapshot *DeltaClusterSnapshot) Fork() {
snapshot.data = snapshot.data.fork()
return nil
}
// Revert reverts snapshot state to moment of forking.
// Time: O(1)
func (snapshot *DeltaClusterSnapshot) Revert() error {
func (snapshot *DeltaClusterSnapshot) Revert() {
if snapshot.data.baseData != nil {
snapshot.data = snapshot.data.baseData
}
return nil
}
// Commit commits changes done after forking.