Merge pull request #7696 from macsko/fix_data_race_while_setting_delta_cluster_state_in_parallel
Fix data race while setting delta cluster state in parallel
This commit is contained in:
commit
adda3d40bb
|
|
@ -248,24 +248,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
|
|
||||||
ni.AddPod(pod)
|
|
||||||
|
|
||||||
// Maybe consider deleting from the list in the future. Maybe not.
|
|
||||||
data.clearCaches()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
|
|
||||||
for _, pod := range pods {
|
|
||||||
ni.AddPod(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Maybe consider deleting from the list in the future. Maybe not.
|
|
||||||
data.clearCaches()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
|
func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
|
||||||
// This always clones node info, even if the pod is actually missing.
|
// This always clones node info, even if the pod is actually missing.
|
||||||
// Not sure if we mind, since removing non-existent pod
|
// Not sure if we mind, since removing non-existent pod
|
||||||
|
|
@ -456,19 +438,17 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
|
||||||
}
|
}
|
||||||
|
|
||||||
// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
|
// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
|
||||||
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
|
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
|
||||||
for _, pod := range scheduledPods {
|
for _, pod := range scheduledPods {
|
||||||
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
|
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
|
||||||
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
|
// Can add pod directly. Cache will be cleared afterwards.
|
||||||
return err
|
nodeInfos[nodeIdx].AddPod(pod)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
|
// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
|
||||||
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
|
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
|
||||||
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
|
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
|
||||||
for _, pod := range scheduledPods {
|
for _, pod := range scheduledPods {
|
||||||
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
|
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
|
||||||
|
|
@ -479,16 +459,13 @@ func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ctx, cancel := context.WithCancelCause(ctx)
|
|
||||||
|
|
||||||
workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
|
workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
|
||||||
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
|
nodeInfo := nodeInfos[nodeIdx]
|
||||||
if err != nil {
|
for _, pod := range podsForNode[nodeIdx] {
|
||||||
cancel(err)
|
// Can add pod directly. Cache will be cleared afterwards.
|
||||||
|
nodeInfo.AddPod(pod)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return context.Cause(ctx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetClusterState sets the cluster state.
|
// SetClusterState sets the cluster state.
|
||||||
|
|
@ -507,19 +484,16 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
|
||||||
}
|
}
|
||||||
|
|
||||||
if snapshot.parallelism > 1 {
|
if snapshot.parallelism > 1 {
|
||||||
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
|
snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
|
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
|
||||||
// after making sure the implementation is always correct in CA 1.33.
|
// after making sure the implementation is always correct in CA 1.33.
|
||||||
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
|
snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear caches after adding pods.
|
||||||
|
snapshot.data.clearCaches()
|
||||||
|
|
||||||
// TODO(DRA): Save DRA snapshot.
|
// TODO(DRA): Save DRA snapshot.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue