Don't pass NodeGroup as a parameter to functions running asynchronously
This commit is contained in:
parent
1ecb84389b
commit
257e66c9b4
|
|
@ -71,7 +71,7 @@ func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, csr *clusterstate.C
|
|||
func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool) {
|
||||
// If delete interval is 0, than instantly start node deletion.
|
||||
if d.deleteInterval == 0 {
|
||||
err := deleteNodesFromCloudProvider(d.ctx, nodes, nodeGroup)
|
||||
_, err := deleteNodesFromCloudProvider(d.ctx, nodes)
|
||||
for _, node := range nodes {
|
||||
if err != nil {
|
||||
result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
|
||||
|
|
@ -84,10 +84,11 @@ func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovi
|
|||
}
|
||||
first := d.addNodesToBucket(nodes, nodeGroup, drain)
|
||||
if first {
|
||||
go func(nodeGroup cloudprovider.NodeGroup) {
|
||||
// Just in case a node group implementation is not thread-safe, the async "remove" function will obtain a new instance of it to preform deletion.
|
||||
go func(nodeGroupId string) {
|
||||
time.Sleep(d.deleteInterval)
|
||||
d.remove(nodeGroup)
|
||||
}(nodeGroup)
|
||||
d.remove(nodeGroupId)
|
||||
}(nodeGroup.Id())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -108,14 +109,14 @@ func (d *NodeDeletionBatcher) addNodesToBucket(nodes []*apiv1.Node, nodeGroup cl
|
|||
}
|
||||
|
||||
// remove deletes nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
|
||||
func (d *NodeDeletionBatcher) remove(nodeGroup cloudprovider.NodeGroup) error {
|
||||
func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
nodes, ok := d.deletionsPerNodeGroup[nodeGroup.Id()]
|
||||
nodes, ok := d.deletionsPerNodeGroup[nodeGroupId]
|
||||
if !ok {
|
||||
return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroup.Id())
|
||||
return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroupId)
|
||||
}
|
||||
delete(d.deletionsPerNodeGroup, nodeGroup.Id())
|
||||
delete(d.deletionsPerNodeGroup, nodeGroupId)
|
||||
drainedNodeDeletions := make(map[string]bool)
|
||||
for _, node := range nodes {
|
||||
drainedNodeDeletions[node.Name] = d.drainedNodeDeletions[node.Name]
|
||||
|
|
@ -124,12 +125,12 @@ func (d *NodeDeletionBatcher) remove(nodeGroup cloudprovider.NodeGroup) error {
|
|||
|
||||
go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) {
|
||||
var result status.NodeDeleteResult
|
||||
err := deleteNodesFromCloudProvider(d.ctx, nodes, nodeGroup)
|
||||
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes)
|
||||
for _, node := range nodes {
|
||||
drain := drainedNodeDeletions[node.Name]
|
||||
if err != nil {
|
||||
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
|
||||
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
|
||||
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroupId, drain, d.nodeDeletionTracker, "", result)
|
||||
} else {
|
||||
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
|
||||
}
|
||||
|
|
@ -140,11 +141,15 @@ func (d *NodeDeletionBatcher) remove(nodeGroup cloudprovider.NodeGroup) error {
|
|||
|
||||
// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on
|
||||
// the Kubernetes side.
|
||||
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup) error {
|
||||
if err := nodeGroup.DeleteNodes(nodes); err != nil {
|
||||
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete nodes from group %s: %v", nodeGroup.Id(), err)
|
||||
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) {
|
||||
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0])
|
||||
if err != nil {
|
||||
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err)
|
||||
}
|
||||
return nil
|
||||
if err := nodeGroup.DeleteNodes(nodes); err != nil {
|
||||
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete nodes from group %s: %v", nodeGroup.Id(), err)
|
||||
}
|
||||
return nodeGroup, nil
|
||||
}
|
||||
|
||||
func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ func TestRemove(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
err = d.remove(nodeGroup)
|
||||
err = d.remove(nodeGroup.Id())
|
||||
if test.err {
|
||||
if err == nil {
|
||||
t.Errorf("remove() should return error, but return nil")
|
||||
|
|
|
|||
Loading…
Reference in New Issue