Merge pull request #4828 from x13n/ndt

Make NodeDeletionTracker implement ActuationStatus interface
This commit is contained in:
Kubernetes Prow Robot 2022-04-28 08:55:50 -07:00 committed by GitHub
commit 561a9da9e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 342 additions and 133 deletions

View File

@ -18,92 +18,153 @@ package deletiontracker
import (
"sync"
"time"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/expiring"
apiv1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
"k8s.io/utils/clock"
)
// NodeDeletionTracker keeps track of node deletions.
// TODO: extend to implement ActuationStatus interface
type NodeDeletionTracker struct {
sync.Mutex
nonEmptyNodeDeleteInProgress bool
// A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus
// objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that
// an error occurred during the deletion process.
nodeDeleteResults map[string]status.NodeDeleteResult
// A map which keeps track of deletions in progress for nodepools.
// Key is a node group id and value is a number of node deletions in progress.
deletionsInProgress map[string]int
deletionsPerNodeGroup map[string]int
// This mapping contains node names of all empty nodes currently undergoing deletion.
emptyNodeDeletions map[string]bool
// This mapping contains node names of all nodes currently undergoing drain and deletion.
drainedNodeDeletions map[string]bool
// Clock for checking current time.
clock clock.PassiveClock
// Helper struct for tracking pod evictions.
evictions *expiring.List
// How long evictions are considered as recent.
evictionsTTL time.Duration
// Helper struct for tracking deletion results.
deletionResults *expiring.List
}
type deletionResult struct {
nodeName string
result status.NodeDeleteResult
}
// NewNodeDeletionTracker creates new NodeDeletionTracker.
func NewNodeDeletionTracker() *NodeDeletionTracker {
func NewNodeDeletionTracker(podEvictionsTTL time.Duration) *NodeDeletionTracker {
return &NodeDeletionTracker{
nodeDeleteResults: make(map[string]status.NodeDeleteResult),
deletionsInProgress: make(map[string]int),
deletionsPerNodeGroup: make(map[string]int),
emptyNodeDeletions: make(map[string]bool),
drainedNodeDeletions: make(map[string]bool),
clock: clock.RealClock{},
evictions: expiring.NewList(),
evictionsTTL: podEvictionsTTL,
deletionResults: expiring.NewList(),
}
}
// IsNonEmptyNodeDeleteInProgress returns true if a non empty node is being deleted.
func (n *NodeDeletionTracker) IsNonEmptyNodeDeleteInProgress() bool {
n.Lock()
defer n.Unlock()
return n.nonEmptyNodeDeleteInProgress
}
// SetNonEmptyNodeDeleteInProgress sets non empty node deletion in progress status.
func (n *NodeDeletionTracker) SetNonEmptyNodeDeleteInProgress(status bool) {
n.Lock()
defer n.Unlock()
n.nonEmptyNodeDeleteInProgress = status
}
// StartDeletion increments node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) StartDeletion(nodeGroupId string) {
func (n *NodeDeletionTracker) StartDeletion(nodeGroupId, nodeName string) {
n.Lock()
defer n.Unlock()
n.deletionsInProgress[nodeGroupId]++
n.deletionsPerNodeGroup[nodeGroupId]++
n.emptyNodeDeletions[nodeName] = true
}
// StartDeletionWithDrain is equivalent to StartDeletion, but for counting nodes that are drained first.
func (n *NodeDeletionTracker) StartDeletionWithDrain(nodeGroupId, nodeName string) {
n.Lock()
defer n.Unlock()
n.deletionsPerNodeGroup[nodeGroupId]++
n.drainedNodeDeletions[nodeName] = true
}
// EndDeletion decrements node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) EndDeletion(nodeGroupId string) {
func (n *NodeDeletionTracker) EndDeletion(nodeGroupId, nodeName string, result status.NodeDeleteResult) {
n.Lock()
defer n.Unlock()
value, found := n.deletionsInProgress[nodeGroupId]
n.deletionResults.RegisterElement(&deletionResult{nodeName, result})
value, found := n.deletionsPerNodeGroup[nodeGroupId]
if !found {
klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus wasn't found", nodeGroupId)
klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker wasn't found", nodeGroupId)
return
}
if value <= 0 {
klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus isn't greater than 0, counter value is %d", nodeGroupId, value)
klog.Errorf("This should never happen, counter for %s in NodeDeletionTracker isn't greater than 0, counter value is %d", nodeGroupId, value)
}
n.deletionsInProgress[nodeGroupId]--
if n.deletionsInProgress[nodeGroupId] <= 0 {
delete(n.deletionsInProgress, nodeGroupId)
n.deletionsPerNodeGroup[nodeGroupId]--
if n.deletionsPerNodeGroup[nodeGroupId] <= 0 {
delete(n.deletionsPerNodeGroup, nodeGroupId)
}
delete(n.emptyNodeDeletions, nodeName)
delete(n.drainedNodeDeletions, nodeName)
}
// GetDeletionsInProgress returns the number of deletions in progress for the given node group.
func (n *NodeDeletionTracker) GetDeletionsInProgress(nodeGroupId string) int {
// DeletionsInProgress returns a list of all node names currently undergoing deletion.
func (n *NodeDeletionTracker) DeletionsInProgress() ([]string, []string) {
n.Lock()
defer n.Unlock()
return n.deletionsInProgress[nodeGroupId]
return mapKeysSlice(n.emptyNodeDeletions), mapKeysSlice(n.drainedNodeDeletions)
}
// AddNodeDeleteResult adds a node delete result to the result map.
func (n *NodeDeletionTracker) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) {
n.Lock()
defer n.Unlock()
n.nodeDeleteResults[nodeName] = result
func mapKeysSlice(m map[string]bool) []string {
s := make([]string, len(m))
i := 0
for k := range m {
s[i] = k
i++
}
return s
}
// GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one.
func (n *NodeDeletionTracker) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult {
// RegisterEviction stores information about a pod that was recently evicted.
func (n *NodeDeletionTracker) RegisterEviction(pod *apiv1.Pod) {
n.Lock()
defer n.Unlock()
results := n.nodeDeleteResults
n.nodeDeleteResults = make(map[string]status.NodeDeleteResult)
return results
n.evictions.RegisterElement(pod)
}
// RecentEvictions returns a list of pods that were recently evicted by Cluster Autoscaler.
func (n *NodeDeletionTracker) RecentEvictions() []*apiv1.Pod {
n.Lock()
defer n.Unlock()
n.evictions.DropNotNewerThan(n.clock.Now().Add(-n.evictionsTTL))
els := n.evictions.ToSlice()
pods := make([]*apiv1.Pod, 0, len(els))
for _, el := range els {
pods = append(pods, el.(*apiv1.Pod))
}
return pods
}
// DeletionsCount returns the number of deletions in progress for the given node group.
func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int {
n.Lock()
defer n.Unlock()
return n.deletionsPerNodeGroup[nodeGroupId]
}
// DeletionResults returns deletion results in a map form, along with the timestamp of last result.
func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
n.Lock()
defer n.Unlock()
els, ts := n.deletionResults.ToSliceWithTimestamp()
drs := make(map[string]status.NodeDeleteResult)
for _, el := range els {
dr := el.(*deletionResult)
drs[dr.nodeName] = dr.result
}
return drs, ts
}
// ClearResultsNotNewerThan iterates over existing deletion results and keeps
// only the ones that are newer than the provided timestamp.
func (n *NodeDeletionTracker) ClearResultsNotNewerThan(t time.Time) {
n.Lock()
defer n.Unlock()
n.deletionResults.DropNotNewerThan(t)
}

View File

@ -295,7 +295,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: simulator.NewUsageTracker(),
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
}
}
@ -637,11 +637,6 @@ func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
return ns
}
// IsNonEmptyNodeDeleteInProgress returns true if any nodes are being deleted.
func (sd *ScaleDown) IsNonEmptyNodeDeleteInProgress() bool {
return sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()
}
// markSimulationError indicates a simulation error by clearing relevant scale
// down state and returning an appropriate error.
func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
@ -692,8 +687,8 @@ func (sd *ScaleDown) TryToScaleDown(
currentTime time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()}
ndr, ts := sd.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: ndr, NodeDeleteResultsAsOf: ts}
nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
@ -790,7 +785,7 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}
deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached)
@ -890,19 +885,16 @@ func (sd *ScaleDown) TryToScaleDown(
// Starting deletion.
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)
nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
return scaleDownStatus, errors.NewAutoscalerError(errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)
}
sd.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), toRemove.Node.Name)
go func() {
// Finishing the delete process once this goroutine is over.
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }()
defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false)
nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(
errors.InternalError, "failed to find node group for %s", toRemove.Node.Name)}
return
}
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), toRemove.Node.Name, result) }()
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
if result.ResultType != status.NodeDeleteOk {
klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err)
@ -968,7 +960,7 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err)
continue
}
deletionsInProgress := sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
available = size - nodeGroup.MinSize() - deletionsInProgress
if available < 0 {
available = 0
@ -1016,10 +1008,9 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodesToRemove []simulator.Nod
}
deletedNodes = append(deletedNodes, empty.Node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name)
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(nodeToDelete.Name, result) }()
defer func() { sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id(), nodeToDelete.Name, result) }()
var deleteErr errors.AutoscalerError
// If we fail to delete the node we want to remove delete taint
@ -1110,9 +1101,6 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}
sd.nodeDeletionTracker.StartDeletion(nodeGroup.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id())
// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {

View File

@ -1220,7 +1220,8 @@ func TestScaleDown(t *testing.T) {
func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.IsNonEmptyNodeDeleteInProgress() {
_, drained := sd.nodeDeletionTracker.DeletionsInProgress()
if len(drained) == 0 {
return
}
}
@ -1530,9 +1531,9 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) {
}
func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) {
nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker()
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
nodeDeletionTracker.StartDeletion("ng1", "n1")
nodeDeletionTracker.StartDeletion("ng1", "n2")
options := defaultScaleDownOptions
config := &ScaleTestConfig{
Nodes: []NodeConfig{
@ -1622,7 +1623,6 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
autoscalererr = scaleDown.UpdateUnneededNodes(nodes, nodes, time.Now().Add(-5*time.Minute), nil)
assert.NoError(t, autoscalererr)
scaleDownStatus, err := scaleDown.TryToScaleDown(time.Now(), nil)
assert.False(t, scaleDown.IsNonEmptyNodeDeleteInProgress())
assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
@ -1652,6 +1652,8 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
assert.Equal(t, expectedScaleDownCount, len(deleted))
assert.Subset(t, config.ExpectedScaleDowns, deleted)
_, nonEmptyDeletions := scaleDown.nodeDeletionTracker.DeletionsInProgress()
assert.Equal(t, 0, len(nonEmptyDeletions))
}
func TestNoScaleDownUnready(t *testing.T) {

View File

@ -20,7 +20,6 @@ import (
"time"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
@ -89,49 +88,11 @@ func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node, current
func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
// TODO: snapshot information from the tracker instead of keeping live
// updated object.
return &actuationStatus{
ndt: p.sd.nodeDeletionTracker,
}
return p.sd.nodeDeletionTracker
}
// ClearResultsNotNewerThan clears old node deletion results kept by the
// Actuator.
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
// TODO: implement this once results are not cleared while being
// fetched.
}
type actuationStatus struct {
ndt *deletiontracker.NodeDeletionTracker
}
// DeletionsInProgress returns node names of currently deleted nodes.
// Current implementation is not aware of the actual nodes names, so it returns
// a fake node name instead.
// TODO: Return real node names
func (a *actuationStatus) DeletionsInProgress() []string {
if a.ndt.IsNonEmptyNodeDeleteInProgress() {
return []string{"fake-node-name"}
}
return nil
}
// DeletionsCount returns total number of ongoing deletions in a given node
// group.
func (a *actuationStatus) DeletionsCount(nodeGroupId string) int {
return a.ndt.GetDeletionsInProgress(nodeGroupId)
}
// RecentEvictions should return a list of recently evicted pods. Since legacy
// scale down logic only drains at most one node at a time, this safeguard is
// not really needed there, so we can just return an empty list.
func (a *actuationStatus) RecentEvictions() []*apiv1.Pod {
return nil
}
// DeletionResults returns a map of recent node deletion results.
func (a *actuationStatus) DeletionResults() map[string]status.NodeDeleteResult {
// TODO: update nodeDeletionTracker so it doesn't get & clear in the
// same step.
return a.ndt.GetAndClearNodeDeleteResults()
p.sd.nodeDeletionTracker.ClearResultsNotNewerThan(t)
}

View File

@ -67,9 +67,10 @@ type Actuator interface {
// ActuationStatus is used for feeding Actuator status back into Planner
type ActuationStatus interface {
// DeletionsInProgress returns a list of nodes that are currently
// undergoing deletion.
DeletionsInProgress() (nodeNames []string)
// DeletionsInProgress returns two lists of node names that are
// currently undergoing deletion, for empty and non-empty (i.e. drained)
// nodes separately.
DeletionsInProgress() (empty, drained []string)
// DeletionsCount returns total number of ongoing deletions in a given
// node group.
DeletionsCount(nodeGroupId string) int
@ -80,5 +81,7 @@ type ActuationStatus interface {
// DeletionResults returns a map of recent node deletion results, keyed
// by the node name. Note: if node deletion was scheduled more than
// once, only the latest result will be present.
DeletionResults() map[string]status.NodeDeleteResult
// The timestamp returned as the second value indicates the time at
// which the last result was collected.
DeletionResults() (map[string]status.NodeDeleteResult, time.Time)
}

View File

@ -523,22 +523,23 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
// TODO(x13n): Move deletionsInProgress > 0 condition to the legacy scaledown implementation.
deletionsInProgress := len(actuationStatus.DeletionsInProgress())
// TODO(x13n): Move nonEmptyDeletionsCount > 0 condition to the legacy scaledown implementation.
_, nonEmptyDeletionsInProgress := actuationStatus.DeletionsInProgress()
nonEmptyDeletionsCount := len(nonEmptyDeletionsInProgress)
// In dry run only utilization is updated
calculateUnneededOnly := scaleDownInCooldown || deletionsInProgress > 0
calculateUnneededOnly := scaleDownInCooldown || nonEmptyDeletionsCount > 0
klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "+
"deletionsInProgress=%v scaleDownInCooldown=%v",
"nonEmptyDeletionsCount=%v scaleDownInCooldown=%v",
calculateUnneededOnly, a.lastScaleUpTime,
a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop,
deletionsInProgress, scaleDownInCooldown)
nonEmptyDeletionsCount, scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown
} else if deletionsInProgress > 0 {
} else if nonEmptyDeletionsCount > 0 {
scaleDownStatus.Result = status.ScaleDownInProgress
} else {
klog.V(4).Infof("Starting scale down")
@ -554,6 +555,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete()
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain, currentTime)
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))

View File

@ -1331,7 +1331,9 @@ func nodeNames(ns []*apiv1.Node) []string {
func waitForDeleteToFinish(t *testing.T, sda scaledown.Actuator) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if len(sda.CheckStatus().DeletionsInProgress()) == 0 {
_, dip := sda.CheckStatus().DeletionsInProgress()
klog.Infof("Non empty deletions in progress: %v", dip)
if len(dip) == 0 {
return
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package status
import (
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
@ -28,11 +30,12 @@ import (
// ScaleDownStatus represents the state of scale down.
type ScaleDownStatus struct {
Result ScaleDownResult
ScaledDownNodes []*ScaleDownNode
UnremovableNodes []*UnremovableNode
RemovedNodeGroups []cloudprovider.NodeGroup
NodeDeleteResults map[string]NodeDeleteResult
Result ScaleDownResult
ScaledDownNodes []*ScaleDownNode
UnremovableNodes []*UnremovableNode
RemovedNodeGroups []cloudprovider.NodeGroup
NodeDeleteResults map[string]NodeDeleteResult
NodeDeleteResultsAsOf time.Time
}
// SetUnremovableNodesInfo sets the status of nodes that were found to be unremovable.

View File

@ -0,0 +1,93 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expiring
import (
"container/list"
"time"
"k8s.io/utils/clock"
)
type elementWithTimestamp struct {
value interface{}
added time.Time
}
// List tracks elements along with their creation times.
// This is essentially a linked list with timestamp on each entry, allowing
// dropping old entries. This struct is not thread safe.
// TODO(x13n): Migrate to generics once supported by Go stdlib (container/list
// in particular).
type List struct {
lst list.List
clock clock.PassiveClock
}
// NewList creates a new expiring list.
func NewList() *List {
return newListWithClock(clock.RealClock{})
}
// Warning: This object doesn't support time travel. Subsequent calls to
// clock.Now are expected to return non-decreasing time values.
func newListWithClock(clock clock.PassiveClock) *List {
return &List{
clock: clock,
}
}
// ToSlice converts the underlying list of elements into a slice.
func (e *List) ToSlice() []interface{} {
p := e.lst.Front()
ps := make([]interface{}, 0, e.lst.Len())
for p != nil {
ps = append(ps, p.Value.(*elementWithTimestamp).value)
p = p.Next()
}
return ps
}
// ToSliceWithTimestamp is identical to ToSlice, but additionally returns the
// timestamp of newest entry (or current time if there are no entries).
func (e *List) ToSliceWithTimestamp() ([]interface{}, time.Time) {
if e.lst.Len() == 0 {
return nil, e.clock.Now()
}
return e.ToSlice(), e.lst.Back().Value.(*elementWithTimestamp).added
}
// RegisterElement adds new element to the list.
func (e *List) RegisterElement(elem interface{}) {
e.lst.PushBack(&elementWithTimestamp{elem, e.clock.Now()})
}
// DropNotNewerThan removes all elements of the list that are older or exactly
// as old as the provided time.
func (e *List) DropNotNewerThan(expiry time.Time) {
p := e.lst.Front()
for p != nil {
if p.Value.(*elementWithTimestamp).added.After(expiry) {
// First not-expired element on the list, skip checking
// the rest.
return
}
d := p
p = p.Next()
e.lst.Remove(d)
}
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expiring
import (
"testing"
"testing/quick"
"time"
klog "k8s.io/klog/v2"
clock "k8s.io/utils/clock/testing"
)
func TestToSlice(t *testing.T) {
if err := quick.Check(identityCheck, nil); err != nil {
t.Error(err)
}
}
func identityCheck(list []int) bool {
l := NewList()
l.registerElementsFrom(list)
return l.equals(list)
}
func TestDropNotNewer(t *testing.T) {
if err := quick.Check(dropChecks, nil); err != nil {
t.Error(err)
}
}
func dropChecks(l1, l2, l3 []int) bool {
t0 := time.Now()
c := clock.NewFakePassiveClock(t0)
t1, t2 := t0.Add(1*time.Minute), t0.Add(2*time.Minute)
l := newListWithClock(c)
l.registerElementsFrom(l1)
c.SetTime(t1)
l.registerElementsFrom(l2)
c.SetTime(t2)
if !l.equals(append(l1, l2...)) {
return false
}
l.DropNotNewerThan(t0)
if !l.equals(l2) {
return false
}
l.registerElementsFrom(l3)
if !l.equals(append(l2, l3...)) {
return false
}
l.DropNotNewerThan(t1)
if !l.equals(l3) {
return false
}
l.DropNotNewerThan(t2)
return len(l.ToSlice()) == 0
}
func (e *List) registerElementsFrom(list []int) {
for _, i := range list {
e.RegisterElement(i)
}
}
func (e *List) equals(want []int) bool {
got := e.ToSlice()
if len(got) != len(want) {
klog.Errorf("len(%v) != len(%v)", got, want)
return false
}
for i, g := range got {
w := want[i]
if g.(int) != w {
klog.Errorf("%v != %v (difference at index %v)", got, want, i)
return false
}
}
return true
}