Revert "Parallelize creating cluster snapshot"
This commit is contained in:
parent
5cd491a5a1
commit
f14c532f87
|
@ -311,8 +311,6 @@ type AutoscalingOptions struct {
|
|||
ForceDeleteLongUnregisteredNodes bool
|
||||
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
|
||||
DynamicResourceAllocationEnabled bool
|
||||
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
|
||||
ClusterSnapshotParallelism int
|
||||
}
|
||||
|
||||
// KubeClientOptions specify options for kube client
|
||||
|
|
|
@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
|
|||
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
|
||||
},
|
||||
"delta": func() clustersnapshot.ClusterSnapshot {
|
||||
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16))
|
||||
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore())
|
||||
},
|
||||
}
|
||||
for snapshotName, snapshotFactory := range snapshots {
|
||||
|
|
|
@ -283,7 +283,6 @@ var (
|
|||
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
|
||||
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
|
||||
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
|
||||
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
|
||||
)
|
||||
|
||||
func isFlagPassed(name string) bool {
|
||||
|
@ -464,7 +463,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
|
|||
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
|
||||
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
|
||||
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
|
||||
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -507,7 +505,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
|
|||
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
|
||||
drainabilityRules := rules.Default(deleteOptions)
|
||||
|
||||
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism)
|
||||
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore()
|
||||
if autoscalingOptions.DynamicResourceAllocationEnabled {
|
||||
// TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA.
|
||||
klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.")
|
||||
|
|
|
@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
|
||||
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
|
||||
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore())
|
||||
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
|
||||
assert.NoError(t, err)
|
||||
ctx := context.AutoscalingContext{
|
||||
|
|
|
@ -56,7 +56,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
|
||||
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,11 @@ limitations under the License.
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
|
||||
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
)
|
||||
|
@ -46,8 +44,7 @@ import (
|
|||
// pod affinity - causes scheduler framework to list pods with non-empty selector,
|
||||
// so basic caching doesn't help.
|
||||
type DeltaSnapshotStore struct {
|
||||
data *internalDeltaSnapshotData
|
||||
parallelism int
|
||||
data *internalDeltaSnapshotData
|
||||
}
|
||||
|
||||
type deltaSnapshotStoreNodeLister DeltaSnapshotStore
|
||||
|
@ -140,14 +137,10 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework
|
|||
return nodeInfoList
|
||||
}
|
||||
|
||||
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) {
|
||||
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
|
||||
nodeInfo := schedulerframework.NewNodeInfo()
|
||||
nodeInfo.SetNode(node)
|
||||
err := data.addNodeInfo(nodeInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nodeInfo, nil
|
||||
return data.addNodeInfo(nodeInfo)
|
||||
}
|
||||
|
||||
func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
|
||||
|
@ -248,24 +241,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
|
|||
return nil
|
||||
}
|
||||
|
||||
func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
|
||||
ni.AddPod(pod)
|
||||
|
||||
// Maybe consider deleting from the list in the future. Maybe not.
|
||||
data.clearCaches()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
|
||||
for _, pod := range pods {
|
||||
ni.AddPod(pod)
|
||||
}
|
||||
|
||||
// Maybe consider deleting from the list in the future. Maybe not.
|
||||
data.clearCaches()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
|
||||
// This always clones node info, even if the pod is actually missing.
|
||||
// Not sure if we mind, since removing non-existent pod
|
||||
|
@ -428,10 +403,8 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla
|
|||
}
|
||||
|
||||
// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
|
||||
func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore {
|
||||
snapshot := &DeltaSnapshotStore{
|
||||
parallelism: parallelism,
|
||||
}
|
||||
func NewDeltaSnapshotStore() *DeltaSnapshotStore {
|
||||
snapshot := &DeltaSnapshotStore{}
|
||||
snapshot.clear()
|
||||
return snapshot
|
||||
}
|
||||
|
@ -444,7 +417,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {
|
|||
|
||||
// AddSchedulerNodeInfo adds a NodeInfo.
|
||||
func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
|
||||
if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
|
||||
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, podInfo := range nodeInfo.Pods {
|
||||
|
@ -455,71 +428,24 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
|
|||
return nil
|
||||
}
|
||||
|
||||
// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
|
||||
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
|
||||
for _, pod := range scheduledPods {
|
||||
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
|
||||
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
|
||||
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
|
||||
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
|
||||
for _, pod := range scheduledPods {
|
||||
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
|
||||
workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
|
||||
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
})
|
||||
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
|
||||
// SetClusterState sets the cluster state.
|
||||
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
|
||||
snapshot.clear()
|
||||
|
||||
nodeNameToIdx := make(map[string]int, len(nodes))
|
||||
nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes))
|
||||
for i, node := range nodes {
|
||||
nodeInfo, err := snapshot.data.addNode(node)
|
||||
if err != nil {
|
||||
knownNodes := make(map[string]bool)
|
||||
for _, node := range nodes {
|
||||
if err := snapshot.data.addNode(node); err != nil {
|
||||
return err
|
||||
}
|
||||
nodeNameToIdx[node.Name] = i
|
||||
nodeInfos[i] = nodeInfo
|
||||
knownNodes[node.Name] = true
|
||||
}
|
||||
|
||||
if snapshot.parallelism > 1 {
|
||||
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
|
||||
// after making sure the implementation is always correct in CA 1.33.
|
||||
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
|
||||
if err != nil {
|
||||
return err
|
||||
for _, pod := range scheduledPods {
|
||||
if knownNodes[pod.Spec.NodeName] {
|
||||
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(DRA): Save DRA snapshot.
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
|
|||
for _, tc := range testCases {
|
||||
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
|
||||
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000)
|
||||
deltaStore := NewDeltaSnapshotStore(16)
|
||||
deltaStore := NewDeltaSnapshotStore()
|
||||
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil {
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
|
|||
for _, tc := range testCases {
|
||||
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
|
||||
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount)
|
||||
deltaStore := NewDeltaSnapshotStore(16)
|
||||
deltaStore := NewDeltaSnapshotStore()
|
||||
if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil {
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue