Compare commits

...

22 Commits

Author SHA1 Message Date
Volcano Bot 4ac9bc0ffd
Merge pull request #4389 from JesseStutler/fix_4299
Delete secrets permission for volcano agent
2025-06-21 08:07:11 +08:00
Volcano Bot 1098a08122
Merge pull request #4377 from zhifei92/sopport_allocate_func_for_extender
Support the allocation callback function provided by the extender.
2025-06-20 14:23:10 +08:00
JesseStutler ddecb9a75a Delete secrets permission for volcano agent
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-06-20 11:12:18 +08:00
Volcano Bot aa439233cc
Merge pull request #4373 from LY-today/bug-fix
fix: add getNodeStatus
2025-06-18 16:14:09 +08:00
LY-today b858207728
fix: add isNodeUnschedulable and isNodeNotReady func
Signed-off-by: LY-today <724102053@qq.com>
2025-06-18 14:55:47 +08:00
zhangzhifei16 7c3720380e feat: support the allocate callback function for extender.
Signed-off-by: zhangzhifei16 <zhangzhifei16@jd.com>

fix: Correct log formatting.

Signed-off-by: zhangzhifei16 <zhangzhifei16@jd.com>
2025-06-18 09:35:07 +08:00
Volcano Bot 7febf4dff7
Merge pull request #4378 from ElectricFish7/bug-fix
Move InitCycleState from openSession to OpenSession
2025-06-17 17:09:08 +08:00
Yuqi Wu 3c52b43750 Move InitCycleState from openSession to OpenSession
Signed-off-by: Yuqi Wu <wuyuqi22@mails.ucas.ac.cn>
2025-06-16 20:01:23 +08:00
Volcano Bot 3aa260cf38
Merge pull request #4279 from bibibox/add_preempt_proposal
Support topology aware in the preempt action
2025-06-06 16:18:57 +08:00
Box Zhang e9040d33a3 Preempt action support topology
Signed-off-by: Box Zhang <wszwbsddbk@gmail.com>
2025-06-06 15:20:51 +08:00
Box Zhang 3506a7089b Proposal: Preempt action support topology
Signed-off-by: Box Zhang <wszwbsddbk@gmail.com>
2025-06-04 11:33:01 +08:00
Volcano Bot 6e2959db6b
Merge pull request #4339 from JesseStutler/v1.12-update
Revert "Bump image to v1.12.0" in mater branch/ Update api version/ Fix queue status update
2025-06-04 10:52:56 +08:00
JesseStutler a3d19d4bea Update volcano v1.12 Kubernetes compatibility
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-06-04 09:33:16 +08:00
JesseStutler 50895c2c36 Add -v for all e2e testing and set longer timeout
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-06-04 09:33:01 +08:00
Volcano Bot d940019b9c
Merge pull request #4316 from sailorvii/doc-update
Refine vgpu user guide
2025-06-03 10:40:55 +08:00
JesseStutler 88e22aab4e Upgrade api version to v1.12.1
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-06-03 10:13:24 +08:00
Monokaix c7efd55dd9 Fix queue update conflicts when upgrading to new version
Signed-off-by: Monokaix <changxuzheng@huawei.com>
2025-06-03 10:13:12 +08:00
JesseStutler b505c1b310 Revert "Bump image to v1.12.0"
This reverts commit 284eaed827.

Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-06-03 10:12:54 +08:00
Volcano Bot adab22d06a
Merge pull request #4330 from JesseStutler/v1.12-update
Bump image to v1.12.0
2025-05-30 23:16:52 +08:00
JesseStutler fb07f675fd Add -v for printing e2e logs
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-05-30 17:24:28 +08:00
JesseStutler 284eaed827 Bump image to v1.12.0
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
2025-05-30 15:49:02 +08:00
chenw66 57c2d11654 refine vgpu user guide
Signed-off-by: chenw66 <chenw66@chinaunicom.cn>
2025-05-29 21:20:18 +08:00
35 changed files with 2123 additions and 194 deletions

View File

@ -11,7 +11,7 @@ jobs:
e2e_scheduling_actions:
runs-on: ubuntu-24.04
name: E2E about Scheduling Actions
timeout-minutes: 40
timeout-minutes: 50
steps:
- name: Install Go
uses: actions/setup-go@v5

View File

@ -11,7 +11,7 @@ jobs:
e2e_scheduling_basic:
runs-on: ubuntu-24.04
name: E2E about Basic Scheduling
timeout-minutes: 40
timeout-minutes: 50
steps:
- name: Install Go
uses: actions/setup-go@v5

View File

@ -11,7 +11,7 @@ jobs:
e2e_sequence:
runs-on: ubuntu-24.04
name: E2E about Sequence
timeout-minutes: 40
timeout-minutes: 50
steps:
- name: Install Go
uses: actions/setup-go@v5

View File

@ -11,7 +11,7 @@ jobs:
e2e_vcctl:
runs-on: ubuntu-24.04
name: E2E about Volcano CLI
timeout-minutes: 20
timeout-minutes: 50
steps:
- name: Install Go
uses: actions/setup-go@v5

View File

@ -189,6 +189,7 @@ Please follow the guide [Volcano Dashboard](https://github.com/volcano-sh/dashbo
| Volcano v1.9 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |- |_ |- |
| Volcano v1.10 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |_ |- |
| Volcano v1.11 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |- |
| Volcano v1.12 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |✓ |
| Volcano HEAD (master) | - | - | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |✓ |
Key:

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 182 KiB

View File

@ -0,0 +1,98 @@
# Preempt Action Support Topology
## Motivation
In cloud-native task scheduling scenarios, preemption is a key feature to ensure timely scheduling of high-priority tasks. Compared to the K8s scheduler, Volcano's current preemption implementation is relatively simple, especially in handling affinity judgments. To improve the accuracy and efficiency of the preemption mechanism, the existing implementation needs to be optimized, particularly in supporting topology awareness.
## In Scope
- Optimize Volcano's preemption mechanism to support affinity judgments
- Improve single Pod preemption process
- Implement simulation scheduling interface to ensure simulated addition and removal of pods won't cause topology changes
## Out of Scope
- Gang scheduling preemption scenario optimization
## User Stories
### Story 1
As a cluster administrator, I want the system to accurately judge Pod affinity constraints during preemption scheduling to avoid scheduling failures caused by topology changes.
### Story 2
As a user, I expect high-priority Pod preemption to minimize impact on existing Pods while maintaining consistency of affinity rules.
### Story 3
When topology-sensitive resources like GPUs exist, the preemption process needs to consider resource topology relationships to ensure resource allocation after preemption still satisfies original topology constraints.
For example, if a node has 2 GPUs (8GB each), Pod A and Pod B each use 4GB, and Pod C needs 8GB. When Pod C needs to be scheduled, it triggers the preemption mechanism. During the simulation scheduling process, the system will try to preempt Pod A and reschedule it. There are two possible scenarios:
1. If topology changes during simulation scheduling:
- System chooses to preempt Pod A
- The predicate function check successfully for Pod C
- When simulating the re-addition of Pod A, the binpack strategy causes Pod A to be scheduled to a different GPU
- After re-adding Pod A, the predicate function check still passes for Pod C
- This means Pod C can be scheduled without actually removing Pod A
- Therefore, the preemption is considered unnecessary and fails
2. If topology remains consistent during simulation scheduling:
- System chooses to preempt Pod A
- The predicate function check successfully for Pod C
- When simulating the re-addition of Pod A, the original topology relationship is maintained
- After re-adding Pod A, the predicate function check fails for Pod C
- This confirms that Pod A must be removed for Pod C to be scheduled
- Therefore, the preemption is considered necessary and succeeds
Therefore, when implementing the preemption mechanism, it's crucial to verify the necessity of preemption by checking if the topology changes during pod re-addition would affect the scheduling of the preempting pod.
![preempt-action-support-topology-1](images/preempt-action-support-topology/preempt-action-support-topology-1.png)
## Design Detail
### Preemption Process
![preempt-action-support-topology-2](images/preempt-action-support-topology/preempt-action-support-topology-2.png)
1. Execute Predicate on all nodes that are not UnschedulableAndUnresolvable to obtain candidate node list, and perform parallel simulation scheduling on all candidate nodes.
2. The simulation scheduling process for each node is as follows:
1. First consider Pods with lower priority as potential victims on the node
2. Sort the victim list (lower priority and non-PDB-violating victims come first)
3. Remove victims in order, add each removed one to eviction candidates, and observe if the verification function passes
4. Verification function: Try to add pods (pipelined) with higher priority targeting the current node, observe if they can pass predicate; then remove them and observe if they can pass predicate
5. If passed, try to add back the previous eviction candidates in PDB and priority order (to minimize impact), calling verification function after each addition; if verification fails, add to final eviction list
6. If final eviction list is not empty, return it
3. Sort filtered nodes using PreemptNodeOrderFn
4. Schedule Pod to the top-ranked node, evict victims list, and cancel nominatedNodeName of lower priority pods that had nominated this node, moving them from pipeline to pending schedule
### Key Function Modifications
- `SimulateRemoveTaskFn`: Simulate the removal of a task from a node, plugins implement this function to ensure the removal action does not cause topology changes
```go
type SimulateRemoveTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToRemove *TaskInfo, nodeInfo *NodeInfo) error
```
- `SimulateAddTaskFn`: Simulate the addition of a task to a node, plugins implement this function to ensure the addition action does not cause topology changes
```go
type SimulateAddTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToAdd *TaskInfo, nodeInfo *NodeInfo) error
```
- `SimulatePredicateFn`: Simulate the predicate check for a task on a node, plugins implement this function to verify if the task can be scheduled to the node while maintaining topology constraints
```go
type SimulatePredicateFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) error
```
- `SimulateAllocatableFn`: Simulate the allocatable check for a node, plugins implement this function to verify if the queue has enough resources to schedule the task while maintaining topology constraints
```go
type SimulateAllocatableFn func(ctx context.Context, state *k8sframework.CycleState, queue *QueueInfo, task *TaskInfo) bool
```
### Limitations
- Current design focuses on single pod preemption scenarios. Does not handle complex topology changes in gang scheduling
- For complex combinations of affinity rules, multiple attempts may be needed to find the optimal solution. Performance impact of simulation scheduling needs to be evaluated in large-scale clusters

View File

@ -1,25 +1,63 @@
# Volcano vGPU User guide
# Volcano vGPU User Guide
## Prerequisites
## Background Knowledge of GPU Sharing Modes in Volcano
The list of prerequisites for running the Volcano device plugin is described below:
* NVIDIA drivers > 440
* nvidia-docker version > 2.0 (see how to [install](https://github.com/NVIDIA/nvidia-docker) and it's [prerequisites](https://github.com/nvidia/nvidia-docker/wiki/Installation-\(version-2.0\)#prerequisites))
* docker configured with nvidia as the [default runtime](https://github.com/NVIDIA/nvidia-docker/wiki/Advanced-topics#default-runtime).
* Kubernetes version >= 1.16
* Volcano verison >= 1.9
Volcano supports **two GPU sharing modes** for virtual GPU (vGPU) scheduling:
## Environment setup
### 1. HAMI-core (Software-based vGPU)
### Install volcano
**Description**:
Leverages **VCUDA**, a CUDA API hijacking technique to enforce GPU core and memory usage limits, enabling **software-level virtual GPU slicing**.
Refer to [Install Guide](../../installer/README.md) to install volcano.
**Use case**:
Ideal for environments requiring **fine-grained GPU sharing**. Compatible with all GPU types.
After installed, update the scheduler configuration:
---
```shell script
kubectl edit cm -n volcano-system volcano-scheduler-configmap
### 2. Dynamic MIG (Hardware-level GPU Slicing)
**Description**:
Utilizes **NVIDIA's MIG (Multi-Instance GPU)** technology to partition a physical GPU into isolated instances with **hardware-level performance guarantees**.
**Use case**:
Best for **performance-sensitive** workloads. Requires **MIG-capable GPUs** (e.g., A100, H100).
---
GPU Sharing mode is a node configuration. Volcano supports heterogeneous cluster(i.e a part of node uses HAMi-core while another part uses dynamic MIG), See [volcano-vgpu-device-plugin](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) for configuration and details.
## Installation
To enable vGPU scheduling, the following components must be set up based on the selected mode:
### Common Requirements
* **Prerequisites**:
* NVIDIA driver > 440
* nvidia-docker > 2.0
* Docker configured with `nvidia` as the default runtime
* Kubernetes >= 1.16
* Volcano >= 1.9
* **Install Volcano**:
* Follow instructions in [Volcano Installer Guide](https://github.com/volcano-sh/volcano?tab=readme-ov-file#quick-start-guide)
* **Install Device Plugin**:
* Deploy [`volcano-vgpu-device-plugin`](https://github.com/Project-HAMi/volcano-vgpu-device-plugin)
**Note:** the [vgpu device plugin yaml](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/volcano-vgpu-device-plugin.yml) also includes the ***Node GPU mode*** and the ***MIG geometry*** configuration. Please refer to the [vgpu device plugin config](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/doc/config.md).
* **Validate Setup**:
Ensure node allocatable resources include:
```yaml
volcano.sh/vgpu-memory: "89424"
volcano.sh/vgpu-number: "8"
```
* **Scheduler Config Update**:
```yaml
kind: ConfigMap
@ -32,106 +70,121 @@ data:
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: deviceshare
arguments:
deviceshare.VGPUEnable: true # enable vgpu
- name: predicates
- name: proportion
- name: nodeorder
- name: binpack
deviceshare.VGPUEnable: true # enable vgpu plugin
deviceshare.SchedulePolicy: binpack # scheduling policy. binpack / spread
```
### Install Volcano device plugin
Check with:
Please refer to [volcano vgpu device plugin](https://github.com/Project-HAMi/volcano-vgpu-device-plugin?tab=readme-ov-file#enabling-gpu-support-in-kubernetes)
### Verify environment is ready
Check the node status, it is ok if `volcano.sh/vgpu-memory` and `volcano.sh/vgpu-number` are included in the allocatable resources.
```shell script
$ kubectl get node {node name} -oyaml
...
status:
addresses:
- address: 172.17.0.3
type: InternalIP
- address: volcano-control-plane
type: Hostname
allocatable:
cpu: "4"
ephemeral-storage: 123722704Ki
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 8174332Ki
pods: "110"
volcano.sh/vgpu-memory: "89424"
volcano.sh/vgpu-number: "8" # GPU resource
capacity:
cpu: "4"
ephemeral-storage: 123722704Ki
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 8174332Ki
pods: "110"
volcano.sh/vgpu-memory: "89424"
volcano.sh/vgpu-number: "8" # GPU resource
```bash
kubectl get node {node-name} -o yaml
```
### Running GPU Sharing Jobs
---
VGPU can be requested by both set "volcano.sh/vgpu-number" , "volcano.sh/vgpu-cores" and "volcano.sh/vgpu-memory" in resource.limit
### HAMI-core Usage
```shell script
$ cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
* **Pod Spec**:
```yaml
metadata:
name: gpu-pod1
name: hami-pod
annotations:
volcano.sh/vgpu-mode: "hami-core"
spec:
schedulerName: volcano
containers:
- name: cuda-container
image: nvidia/cuda:9.0-devel
command: ["sleep"]
args: ["100000"]
resources:
limits:
volcano.sh/vgpu-number: 2 # requesting 2 gpu cards
volcano.sh/vgpu-memory: 3000 # (optinal)each vGPU uses 3G device memory
volcano.sh/vgpu-cores: 50 # (optional)each vGPU uses 50% core
EOF
- name: cuda-container
image: nvidia/cuda:9.0-devel
resources:
limits:
volcano.sh/vgpu-number: 1 # requesting 1 gpu cards
volcano.sh/vgpu-cores: 50 # (optional)each vGPU uses 50%
volcano.sh/vgpu-memory: 3000 # (optional)each vGPU uses 3G GPU memory
```
You can validate device memory using nvidia-smi inside container:
---
![img](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/doc/hard_limit.jpg)
### Dynamic MIG Usage
> **WARNING:** *if you don't request GPUs when using the device plugin with NVIDIA images all
> the GPUs on the machine will be exposed inside your container.
> The number of vgpu used by a container can not exceed the number of gpus on that node.*
* **Enable MIG Mode**:
### Monitor
If you need to use MIG (Multi-Instance GPU), you must run the following command on the GPU node.
volcano-scheduler-metrics records every GPU usage and limitation, visit the following address to get these metrics.
```
curl {volcano scheduler cluster ip}:8080/metrics
```bash
sudo nvidia-smi -mig 1
```
You can also collect the **GPU utilization**, **GPU memory usage**, **pods' GPU memory limitations** and **pods' GPU memory usage** metrics on nodes by visiting the following addresses:
* **Geometry Config (Optional)**:
The volcano-vgpu-device-plugin automatically generates an initial MIG configuration, which is stored in the `volcano-vgpu-device-config` ConfigMap under the `kube-system` namespace. You can customize this configuration as needed. For more details, refer to the [vgpu device plugin yaml](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/volcano-vgpu-device-plugin.yml).
* **Pod Spec with MIG Annotation**:
```yaml
metadata:
name: mig-pod
annotations:
volcano.sh/vgpu-mode: "mig"
spec:
schedulerName: volcano
containers:
- name: cuda-container
image: nvidia/cuda:9.0-devel
resources:
limits:
volcano.sh/vgpu-number: 1
volcano.sh/vgpu-memory: 3000
```
curl {volcano device plugin pod ip}:9394/metrics
Note: Actual memory allocated depends on best-fit MIG slice (e.g., request 3GB → 5GB slice used).
---
## Scheduler Mode Selection
* **Explicit Mode**:
* Use annotation `volcano.sh/vgpu-mode` to force hami-core or MIG mode.
* If annotation is absent, scheduler selects mode based on resource fit and policy.
* **Scheduling Policy**:
* Modes like `binpack` or `spread` influence node selection.
---
## Summary Table
| Mode | Isolation | MIG GPU Required | Annotation | Core/Memory Control | Recommended For |
| ----------- | ---------------- | ---------------- | ---------- | ------------------- | -------------------------- |
| HAMI-core | Software (VCUDA) | No | No | Yes | General workloads |
| Dynamic MIG | Hardware | Yes | Yes | MIG-controlled | Performance-sensitive jobs |
---
## Monitoring
* **Scheduler Metrics**:
```bash
curl http://<volcano-scheduler-ip>:8080/metrics
```
![img](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/doc/vgpu_device_plugin_metrics.png)
# Issues and Contributing
* **Device Plugin Metrics**:
```bash
curl http://<plugin-pod-ip>:9394/metrics
```
Metrics include GPU utilization, pod memory usage, and limits.
---
## Issues and Contributions
* File bugs: [Volcano Issues](https://github.com/volcano-sh/volcano/issues)
* Contribute: [Pull Requests Guide](https://help.github.com/articles/using-pull-requests/)
* You can report a bug by [filing a new issue](https://github.com/volcano-sh/volcano/issues)
* You can contribute by opening a [pull request](https://help.github.com/articles/using-pull-requests/)

2
go.mod
View File

@ -49,7 +49,7 @@ require (
sigs.k8s.io/controller-runtime v0.13.0
sigs.k8s.io/yaml v1.4.0
stathat.com/c/consistent v1.0.0
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f
volcano.sh/apis v1.12.1
)
require (

4
go.sum
View File

@ -528,5 +528,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f h1:3CLus5h9cv42O6VgbJxA2bFskauMQJKMxJhKTKSdGNc=
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=
volcano.sh/apis v1.12.1 h1:yq5dVj/g21vnWObCIKsJKPhMoThpzDrHDD/GMouYVxk=
volcano.sh/apis v1.12.1/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=

View File

@ -156,31 +156,31 @@ case ${E2E_TYPE} in
;;
"JOBP")
echo "Running parallel job e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --nodes=4 --compilers=4 --randomize-all --randomize-suites --fail-on-pending --cover --trace --race --slow-spec-threshold='30s' --progress ./test/e2e/jobp/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --nodes=4 --compilers=4 --randomize-all --randomize-suites --fail-on-pending --cover --trace --race --slow-spec-threshold='30s' --progress ./test/e2e/jobp/
;;
"JOBSEQ")
echo "Running sequence job e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/jobseq/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/jobseq/
;;
"SCHEDULINGBASE")
echo "Running scheduling base e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingbase/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingbase/
;;
"SCHEDULINGACTION")
echo "Running scheduling action e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingaction/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingaction/
;;
"VCCTL")
echo "Running vcctl e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/vcctl/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/vcctl/
;;
"STRESS")
echo "Running stress e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/stress/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/stress/
;;
"DRA")
echo "Running dra e2e suite..."
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress --focus="DRA E2E Test" ./test/e2e/dra/
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress --focus="DRA E2E Test" ./test/e2e/dra/
;;
esac

View File

@ -210,9 +210,6 @@ rules:
- apiGroups: [""]
resources: [ "nodes", "nodes/status" ]
verbs: [ "get", "list", "watch", "update", "patch" ]
- apiGroups: [ "" ]
resources: [ "secrets" ]
verbs: [ "get", "list", "watch" ]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
verbs: [ "get", "list", "watch", "create", "update" ]

View File

@ -91,7 +91,7 @@ rules:
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["patch"]
verbs: ["update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["podgroups"]
verbs: ["list", "watch", "update"]

View File

@ -19,9 +19,6 @@ rules:
- apiGroups: [""]
resources: [ "nodes", "nodes/status" ]
verbs: [ "get", "list", "watch", "update", "patch" ]
- apiGroups: [ "" ]
resources: [ "secrets" ]
verbs: [ "get", "list", "watch" ]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
verbs: [ "get", "list", "watch", "create", "update" ]

View File

@ -4665,7 +4665,7 @@ rules:
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["patch"]
verbs: ["update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["podgroups"]
verbs: ["list", "watch", "update"]

View File

@ -17,10 +17,23 @@ limitations under the License.
package preempt
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
k8sutil "k8s.io/kubernetes/pkg/scheduler/util"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
@ -29,13 +42,37 @@ import (
"volcano.sh/volcano/pkg/scheduler/util"
)
const (
EnableTopologyAwarePreemptionKey = "enableTopologyAwarePreemption"
TopologyAwarePreemptWorkerNumKey = "topologyAwarePreemptWorkerNum"
MinCandidateNodesPercentageKey = "minCandidateNodesPercentage"
MinCandidateNodesAbsoluteKey = "minCandidateNodesAbsolute"
MaxCandidateNodesAbsoluteKey = "maxCandidateNodesAbsolute"
)
type Action struct {
ssn *framework.Session
enablePredicateErrorCache bool
enableTopologyAwarePreemption bool
topologyAwarePreemptWorkerNum int
minCandidateNodesPercentage int
minCandidateNodesAbsolute int
maxCandidateNodesAbsolute int
}
func New() *Action {
return &Action{
enablePredicateErrorCache: true,
enablePredicateErrorCache: true,
enableTopologyAwarePreemption: false,
topologyAwarePreemptWorkerNum: 16,
minCandidateNodesPercentage: 10,
minCandidateNodesAbsolute: 1,
maxCandidateNodesAbsolute: 100,
}
}
@ -48,6 +85,12 @@ func (pmpt *Action) Initialize() {}
func (pmpt *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, pmpt.Name())
arguments.GetBool(&pmpt.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
arguments.GetBool(&pmpt.enableTopologyAwarePreemption, EnableTopologyAwarePreemptionKey)
arguments.GetInt(&pmpt.topologyAwarePreemptWorkerNum, TopologyAwarePreemptWorkerNumKey)
arguments.GetInt(&pmpt.minCandidateNodesPercentage, MinCandidateNodesPercentageKey)
arguments.GetInt(&pmpt.minCandidateNodesAbsolute, MinCandidateNodesAbsoluteKey)
arguments.GetInt(&pmpt.maxCandidateNodesAbsolute, MaxCandidateNodesAbsoluteKey)
pmpt.ssn = ssn
}
func (pmpt *Action) Execute(ssn *framework.Session) {
@ -234,17 +277,28 @@ func (pmpt *Action) preempt(
return false, err
}
assigned := false
if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateFn := ssn.PredicateForPreemptAction
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, pmpt.enablePredicateErrorCache)
allNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateForPreemptAction, pmpt.enablePredicateErrorCache)
if pmpt.enableTopologyAwarePreemption {
return pmpt.topologyAwarePreempt(ssn, stmt, preemptor, filter, predicateNodes)
}
return pmpt.normalPreempt(ssn, stmt, preemptor, filter, predicateNodes)
}
func (pmpt *Action) normalPreempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
filter func(*api.TaskInfo) bool,
predicateNodes []*api.NodeInfo,
) (bool, error) {
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
selectedNodes := util.SortNodes(nodeScores)
@ -256,6 +310,8 @@ func (pmpt *Action) preempt(
currentQueue := ssn.Queues[job.Queue]
assigned := false
for _, node := range selectedNodes {
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
preemptor.Namespace, preemptor.Name, node.Name)
@ -340,5 +396,578 @@ func (pmpt *Action) taskEligibleToPreempt(preemptor *api.TaskInfo) error {
return fmt.Errorf("not eligible to preempt other tasks due to preemptionPolicy is Never")
}
nomNodeName := preemptor.Pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
nodeInfo, ok := pmpt.ssn.Nodes[nomNodeName]
if !ok {
return fmt.Errorf("not eligible due to the pod's nominated node is not found in the session")
}
err := pmpt.ssn.PredicateFn(preemptor, nodeInfo)
if err == nil {
return fmt.Errorf("not eligible due to the pod's nominated node is already schedulable, which should not happen as preemption means no node is schedulable")
}
fitError, ok := err.(*api.FitError)
if !ok {
return fmt.Errorf("not eligible due to the predicate returned a non-FitError error, the error is: %v", err)
}
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the predicate,
// then the pod should be considered for preempting again.
if fitError.Status.ContainsUnschedulableAndUnresolvable() {
return nil
}
preemptorPodPriority := PodPriority(preemptor.Pod)
for _, p := range nodeInfo.Pods() {
if PodPriority(p) < preemptorPodPriority && podTerminatingByPreemption(p) {
// There is a terminating pod on the nominated node.
return fmt.Errorf("not eligible due to a terminating pod caused by preemption on the nominated node")
}
}
}
return nil
}
func (pmpt *Action) topologyAwarePreempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
filter func(*api.TaskInfo) bool,
predicateNodes []*api.NodeInfo,
) (bool, error) {
// Find all preemption candidates.
candidates, nodeToStatusMap, err := pmpt.findCandidates(preemptor, filter, predicateNodes, stmt)
if err != nil && len(candidates) == 0 {
return false, err
}
// Return error when there are no candidates that fit the pod.
if len(candidates) == 0 {
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return false, fmt.Errorf("no candidates that fit the pod, the status of the nodes are %v", nodeToStatusMap)
}
// Find the best candidate.
bestCandidate := SelectCandidate(candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return false, fmt.Errorf("no candidate node for preemption")
}
if status := prepareCandidate(bestCandidate, preemptor.Pod, stmt, ssn); !status.IsSuccess() {
return false, fmt.Errorf("failed to prepare candidate: %v", status)
}
if err := stmt.Pipeline(preemptor, bestCandidate.Name(), true); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, bestCandidate.Name())
if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
preemptor.UID, bestCandidate.Name(), ssn.UID, rollbackErr)
}
}
return true, nil
}
func (pmpt *Action) findCandidates(preemptor *api.TaskInfo, filter func(*api.TaskInfo) bool, predicateNodes []*api.NodeInfo, stmt *framework.Statement) ([]*candidate, map[string]api.Status, error) {
if len(predicateNodes) == 0 {
klog.V(3).Infof("No nodes are eligible to preempt task %s/%s", preemptor.Namespace, preemptor.Name)
return nil, nil, nil
}
klog.Infof("the predicateNodes number is %d", len(predicateNodes))
nodeToStatusMap := make(map[string]api.Status)
offset, numCandidates := pmpt.GetOffsetAndNumCandidates(len(predicateNodes))
candidates, nodeStatuses, err := pmpt.DryRunPreemption(preemptor, predicateNodes, offset, numCandidates, filter, stmt)
for node, nodeStatus := range nodeStatuses {
nodeToStatusMap[node] = nodeStatus
}
return candidates, nodeToStatusMap, err
}
// prepareCandidate evicts the victim pods before nominating the selected candidate
func prepareCandidate(c *candidate, pod *v1.Pod, stmt *framework.Statement, ssn *framework.Session) *api.Status {
for _, victim := range c.Victims() {
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
victim.Namespace, victim.Name, pod.Namespace, pod.Name)
if err := stmt.Evict(victim, "preempt"); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
victim.Namespace, victim.Name, pod.Namespace, pod.Name, err)
return api.AsStatus(err)
}
}
metrics.RegisterPreemptionAttempts()
return nil
}
// podTerminatingByPreemption returns true if the pod is in the termination state caused by preempt action.
func podTerminatingByPreemption(p *v1.Pod) bool {
if p.DeletionTimestamp == nil {
return false
}
for _, condition := range p.Status.Conditions {
if condition.Type == v1.DisruptionTarget {
return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
}
}
return false
}
// PodPriority returns priority of the given pod.
func PodPriority(pod *v1.Pod) int32 {
if pod.Spec.Priority != nil {
return *pod.Spec.Priority
}
// When priority of a running pod is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return 0
}
// calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
// candidates returned will never be greater than <numNodes>.
func (pmpt *Action) calculateNumCandidates(numNodes int) int {
n := (numNodes * pmpt.minCandidateNodesPercentage) / 100
if n < pmpt.minCandidateNodesAbsolute {
n = pmpt.minCandidateNodesAbsolute
}
if n > pmpt.maxCandidateNodesAbsolute {
n = pmpt.maxCandidateNodesAbsolute
}
if n > numNodes {
n = numNodes
}
return n
}
// offset is used to randomly select a starting point in the potentialNodes array.
// This helps distribute the preemption checks across different nodes and avoid
// always starting from the beginning of the node list, which could lead to
// uneven distribution of preemption attempts.
// GetOffsetAndNumCandidates chooses a random offset and calculates the number
// of candidates that should be shortlisted for dry running preemption.
func (pmpt *Action) GetOffsetAndNumCandidates(numNodes int) (int, int) {
return rand.Intn(numNodes), pmpt.calculateNumCandidates(numNodes)
}
func (pmpt *Action) DryRunPreemption(preemptor *api.TaskInfo, potentialNodes []*api.NodeInfo, offset, numCandidates int, filter func(*api.TaskInfo) bool, stmt *framework.Statement) ([]*candidate, map[string]api.Status, error) {
candidates := newCandidateList(numCandidates)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nodeStatuses := make(map[string]api.Status)
var statusesLock sync.Mutex
var errs []error
job, found := pmpt.ssn.Jobs[preemptor.Job]
if !found {
return nil, nil, fmt.Errorf("not found Job %s in Session", preemptor.Job)
}
currentQueue := pmpt.ssn.Queues[job.Queue]
state := pmpt.ssn.GetCycleState(preemptor.UID)
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
stateCopy := state.Clone()
victims, status := SelectVictimsOnNode(ctx, stateCopy, preemptor, currentQueue, nodeInfoCopy, pmpt.ssn, filter, stmt)
if status.IsSuccess() && len(victims) != 0 {
c := &candidate{
victims: victims,
name: nodeInfoCopy.Name,
}
candidates.add(c)
if candidates.size() >= numCandidates {
cancel()
}
return
}
if status.IsSuccess() && len(victims) == 0 {
status = api.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Name))
}
statusesLock.Lock()
if status.Code == api.Error {
errs = append(errs, status.AsError())
}
nodeStatuses[nodeInfoCopy.Name] = *status
statusesLock.Unlock()
}
workqueue.ParallelizeUntil(ctx, pmpt.topologyAwarePreemptWorkerNum, len(potentialNodes), checkNode)
return candidates.get(), nodeStatuses, utilerrors.NewAggregate(errs)
}
type candidate struct {
victims []*api.TaskInfo
name string
}
// Victims returns s.victims.
func (s *candidate) Victims() []*api.TaskInfo {
return s.victims
}
// Name returns s.name.
func (s *candidate) Name() string {
return s.name
}
type candidateList struct {
idx int32
items []*candidate
}
func newCandidateList(size int) *candidateList {
return &candidateList{idx: -1, items: make([]*candidate, size)}
}
// add adds a new candidate to the internal array atomically.
func (cl *candidateList) add(c *candidate) {
if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) {
cl.items[idx] = c
}
}
// size returns the number of candidate stored. Note that some add() operations
// might still be executing when this is called, so care must be taken to
// ensure that all add() operations complete before accessing the elements of
// the list.
func (cl *candidateList) size() int {
n := int(atomic.LoadInt32(&cl.idx) + 1)
if n >= len(cl.items) {
n = len(cl.items)
}
return n
}
// get returns the internal candidate array. This function is NOT atomic and
// assumes that all add() operations have been completed.
func (cl *candidateList) get() []*candidate {
return cl.items[:cl.size()]
}
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
// for "pod" to be scheduled.
func SelectVictimsOnNode(
ctx context.Context,
state *k8sframework.CycleState,
preemptor *api.TaskInfo,
currentQueue *api.QueueInfo,
nodeInfo *api.NodeInfo,
ssn *framework.Session,
filter func(*api.TaskInfo) bool,
stmt *framework.Statement,
) ([]*api.TaskInfo, *api.Status) {
var potentialVictims []*api.TaskInfo
removeTask := func(rti *api.TaskInfo) error {
err := ssn.SimulateRemoveTaskFn(ctx, state, preemptor, rti, nodeInfo)
if err != nil {
return err
}
if err := nodeInfo.RemoveTask(rti); err != nil {
return err
}
return nil
}
addTask := func(ati *api.TaskInfo) error {
err := ssn.SimulateAddTaskFn(ctx, state, preemptor, ati, nodeInfo)
if err != nil {
return err
}
if err := nodeInfo.AddTask(ati); err != nil {
return err
}
return nil
}
var preemptees []*api.TaskInfo
for _, task := range nodeInfo.Tasks {
if filter == nil {
preemptees = append(preemptees, task.Clone())
} else if filter(task) {
preemptees = append(preemptees, task.Clone())
}
}
klog.V(3).Infof("all preemptees: %v", preemptees)
allVictims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(allVictims))
if err := util.ValidateVictims(preemptor, nodeInfo, allVictims); err != nil {
klog.V(3).Infof("No validated victims on Node <%s>: %v", nodeInfo.Name, err)
return nil, api.AsStatus(fmt.Errorf("no validated victims on Node <%s>: %v", nodeInfo.Name, err))
}
klog.V(3).Infof("allVictims: %v", allVictims)
// Sort potentialVictims by pod priority from high to low, which ensures to
// reprieve higher priority pods first.
sort.Slice(allVictims, func(i, j int) bool { return k8sutil.MoreImportantPod(allVictims[i].Pod, allVictims[j].Pod) })
victimsQueue := ssn.BuildVictimsPriorityQueue(allVictims, preemptor)
for !victimsQueue.Empty() {
task := victimsQueue.Pop().(*api.TaskInfo)
potentialVictims = append(potentialVictims, task)
if err := removeTask(task); err != nil {
return nil, api.AsStatus(err)
}
if ssn.SimulateAllocatableFn(ctx, state, currentQueue, preemptor) && preemptor.InitResreq.LessEqual(nodeInfo.FutureIdle(), api.Zero) {
if err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo); err == nil {
klog.V(3).Infof("Pod %v/%v can be scheduled on node %v after preempt %v/%v, stop evicting more pods", preemptor.Namespace, preemptor.Name, nodeInfo.Name, task.Namespace, task.Name)
break
}
}
}
// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
if len(potentialVictims) == 0 {
return nil, api.AsStatus(fmt.Errorf("no preemption victims found for incoming pod"))
}
// If the new pod does not fit after removing all potential victim pods,
// we are almost done and this node is not suitable for preemption. The only
// condition that we could check is if the "pod" is failing to schedule due to
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo); err != nil {
return nil, api.AsStatus(fmt.Errorf("failed to predicate pod %s/%s on node %s: %v", preemptor.Namespace, preemptor.Name, nodeInfo.Name, err))
}
var victims []*api.TaskInfo
klog.V(3).Infof("potentialVictims---: %v, nodeInfo: %v", potentialVictims, nodeInfo.Name)
// TODO: consider the PDB violation here
reprievePod := func(pi *api.TaskInfo) (bool, error) {
if err := addTask(pi); err != nil {
klog.ErrorS(err, "Failed to add task", "task", klog.KObj(pi.Pod))
return false, err
}
var fits bool
if ssn.SimulateAllocatableFn(ctx, state, currentQueue, preemptor) && preemptor.InitResreq.LessEqual(nodeInfo.FutureIdle(), api.Zero) {
err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo)
fits = err == nil
}
if !fits {
if err := removeTask(pi); err != nil {
return false, err
}
victims = append(victims, pi)
klog.V(3).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node))
}
klog.Infof("reprievePod for task: %v, fits: %v", pi.Name, fits)
return fits, nil
}
// Now we try to reprieve non-violating victims.
for _, p := range potentialVictims {
if _, err := reprievePod(p); err != nil {
return nil, api.AsStatus(err)
}
}
klog.Infof("victims: %v", victims)
return victims, &api.Status{
Reason: "",
}
}
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
// NOTE: This method is exported for easier testing in default preemption.
func SelectCandidate(candidates []*candidate) *candidate {
if len(candidates) == 0 {
return nil
}
if len(candidates) == 1 {
return candidates[0]
}
victimsMap := CandidatesToVictimsMap(candidates)
scoreFuncs := OrderedScoreFuncs(victimsMap)
candidateNode := pickOneNodeForPreemption(victimsMap, scoreFuncs)
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
// preemption plugins that exercise different candidates on the same nominated node.
if victims := victimsMap[candidateNode]; victims != nil {
return &candidate{
victims: victims,
name: candidateNode,
}
}
// We shouldn't reach here.
klog.Error(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
// To not break the whole flow, return the first candidate.
return candidates[0]
}
func CandidatesToVictimsMap(candidates []*candidate) map[string][]*api.TaskInfo {
m := make(map[string][]*api.TaskInfo, len(candidates))
for _, c := range candidates {
m[c.Name()] = c.Victims()
}
return m
}
// TODO: Consider exposing score functions to plugins in the future
func OrderedScoreFuncs(nodesToVictims map[string][]*api.TaskInfo) []func(node string) int64 {
return nil
}
// pickOneNodeForPreemption chooses one node among the given nodes.
// It assumes pods in each map entry are ordered by decreasing priority.
// If the scoreFuncs is not empty, It picks a node based on score scoreFuncs returns.
// If the scoreFuncs is empty,
// It picks a node based on the following criteria:
// 1. A node with minimum number of PDB violations.
// 2. A node with minimum highest priority victim is picked.
// 3. Ties are broken by sum of priorities of all victims.
// 4. If there are still ties, node with the minimum number of victims is picked.
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
// 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[string][]*api.TaskInfo, scoreFuncs []func(node string) int64) string {
if len(nodesToVictims) == 0 {
return ""
}
allCandidates := make([]string, 0, len(nodesToVictims))
for node := range nodesToVictims {
allCandidates = append(allCandidates, node)
}
if len(scoreFuncs) == 0 {
minHighestPriorityScoreFunc := func(node string) int64 {
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := PodPriority(nodesToVictims[node][0].Pod)
// The smaller the highestPodPriority, the higher the score.
return -int64(highestPodPriority)
}
minSumPrioritiesScoreFunc := func(node string) int64 {
var sumPriorities int64
for _, task := range nodesToVictims[node] {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(PodPriority(task.Pod)) + int64(math.MaxInt32+1)
}
// The smaller the sumPriorities, the higher the score.
return -sumPriorities
}
minNumPodsScoreFunc := func(node string) int64 {
// The smaller the length of pods, the higher the score.
return -int64(len(nodesToVictims[node]))
}
latestStartTimeScoreFunc := func(node string) int64 {
// Get the earliest start time of all pods on the current node.
earliestStartTimeOnNode := GetEarliestPodStartTime(nodesToVictims[node])
if earliestStartTimeOnNode == nil {
klog.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
return int64(math.MinInt64)
}
// The bigger the earliestStartTimeOnNode, the higher the score.
return earliestStartTimeOnNode.UnixNano()
}
// Each scoreFunc scores the nodes according to specific rules and keeps the name of the node
// with the highest score. If and only if the scoreFunc has more than one node with the highest
// score, we will execute the other scoreFunc in order of precedence.
scoreFuncs = []func(string) int64{
// A node with a minimum highest priority victim is preferable.
minHighestPriorityScoreFunc,
// A node with the smallest sum of priorities is preferable.
minSumPrioritiesScoreFunc,
// A node with the minimum number of pods is preferable.
minNumPodsScoreFunc,
// A node with the latest start time of all highest priority victims is preferable.
latestStartTimeScoreFunc,
// If there are still ties, then the first Node in the list is selected.
}
}
for _, f := range scoreFuncs {
selectedNodes := []string{}
maxScore := int64(math.MinInt64)
for _, node := range allCandidates {
score := f(node)
if score > maxScore {
maxScore = score
selectedNodes = []string{}
}
if score == maxScore {
selectedNodes = append(selectedNodes, node)
}
}
if len(selectedNodes) == 1 {
return selectedNodes[0]
}
allCandidates = selectedNodes
}
return allCandidates[0]
}
// GetEarliestPodStartTime returns the earliest start time of all pods that
// have the highest priority among all victims.
func GetEarliestPodStartTime(tasks []*api.TaskInfo) *metav1.Time {
if len(tasks) == 0 {
// should not reach here.
klog.Background().Error(nil, "victims.Pods is empty. Should not reach here")
return nil
}
earliestPodStartTime := GetPodStartTime(tasks[0].Pod)
maxPriority := PodPriority(tasks[0].Pod)
for _, task := range tasks {
if podPriority := PodPriority(task.Pod); podPriority == maxPriority {
if podStartTime := GetPodStartTime(task.Pod); podStartTime.Before(earliestPodStartTime) {
earliestPodStartTime = podStartTime
}
} else if podPriority > maxPriority {
maxPriority = podPriority
earliestPodStartTime = GetPodStartTime(task.Pod)
}
}
return earliestPodStartTime
}
// GetPodStartTime returns start time of the given pod or current timestamp
// if it hasn't started yet.
func GetPodStartTime(pod *v1.Pod) *metav1.Time {
if pod.Status.StartTime != nil {
return pod.Status.StartTime
}
// Assumed pods and bound pods that haven't started don't have a StartTime yet.
return &metav1.Time{Time: time.Now()}
}

View File

@ -17,10 +17,13 @@ limitations under the License.
package preempt
import (
"flag"
"testing"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
@ -29,12 +32,19 @@ import (
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/predicates"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
"volcano.sh/volcano/pkg/scheduler/uthelper"
"volcano.sh/volcano/pkg/scheduler/util"
)
func init() {
options.Default()
klog.InitFlags(nil)
flag.Set("v", "4")
}
func TestPreempt(t *testing.T) {
plugins := map[string]framework.PluginBuilder{
conformance.PluginName: conformance.New,
@ -44,7 +54,6 @@ func TestPreempt(t *testing.T) {
}
highPrio := util.BuildPriorityClass("high-priority", 100000)
lowPrio := util.BuildPriorityClass("low-priority", 10)
options.Default()
tests := []uthelper.TestCommonStruct{
{
@ -173,7 +182,7 @@ func TestPreempt(t *testing.T) {
},
{
// case about issue #2232
Name: "preempt low priority job in same queue",
Name: "preempt low priority job in same queue but not pod with preemptable=false or higher priority",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
@ -291,7 +300,8 @@ func TestPreempt(t *testing.T) {
test.Plugins = plugins
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
t.Run(test.Name, func(t *testing.T) {
test.RegisterSession(tiers, nil)
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[0].Name(),
Arguments: map[string]interface{}{EnableTopologyAwarePreemptionKey: false}}})
defer test.Close()
test.Run(actions)
if err := test.CheckAll(i); err != nil {
@ -300,3 +310,317 @@ func TestPreempt(t *testing.T) {
})
}
}
func TestTopologyAwarePreempt(t *testing.T) {
plugins := map[string]framework.PluginBuilder{
conformance.PluginName: conformance.New,
gang.PluginName: gang.New,
priority.PluginName: priority.New,
proportion.PluginName: proportion.New,
predicates.PluginName: predicates.New,
}
highPrio := util.BuildPriorityClass("high-priority", 100000)
lowPrio := util.BuildPriorityClass("low-priority", 10)
tests := []uthelper.TestCommonStruct{
{
Name: "do not preempt if there are enough idle resources",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroup("pg1", "c1", "q1", 3, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
},
// If there are enough idle resources on the node, then there is no need to preempt anything.
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectEvictNum: 0,
},
{
Name: "do not preempt if job is pipelined",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroup("pg1", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue),
util.BuildPodGroup("pg2", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue),
},
// Both pg1 and pg2 jobs are pipelined, because enough pods are already running.
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptor2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
},
// All resources on the node will be in use.
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("3", "3G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectEvictNum: 0,
},
{
Name: "preempt one task of different job to fit both jobs on one node",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
util.BuildPod("c1", "preemptor2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("2", "2G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectEvicted: []string{"c1/preemptee1"},
ExpectEvictNum: 1,
},
{
Name: "preempt enough tasks to fit large task of different job",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
// There are 3 cpus and 3G of memory idle and 3 tasks running each consuming 1 cpu and 1G of memory.
// Big task requiring 5 cpus and 5G of memory should preempt 2 of 3 running tasks to fit into the node.
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("5", "5G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("6", "6G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectEvicted: []string{"c1/preemptee2", "c1/preemptee1"},
ExpectEvictNum: 2,
},
{
// case about #3161
Name: "preempt low priority job in same queue",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("3", "3G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("4", "4G")),
},
ExpectEvicted: []string{"c1/preemptee1"},
ExpectEvictNum: 1,
},
{
// case about #3161
Name: "preempt low priority job in same queue: allocatable and has enough resource, don't preempt",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("3", "3G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
},
ExpectEvictNum: 0,
},
{
// case about issue #2232
Name: "preempt low priority job in same queue but not pod with preemptable=false or higher priority",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPodWithPriority("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string), &highPrio.Value),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("3", "3G")),
},
ExpectEvicted: []string{"c1/preemptee2"},
ExpectEvictNum: 1,
},
{
// case about #3335
Name: "unBestEffort high-priority pod preempt BestEffort low-priority pod in same queue",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
},
ExpectEvicted: []string{"c1/preemptee1"},
ExpectEvictNum: 1,
},
{
// case about #3335
Name: "BestEffort high-priority pod preempt BestEffort low-priority pod in same queue",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, v1.ResourceList{}, "pg2", make(map[string]string), make(map[string]string)),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
},
ExpectEvicted: []string{"c1/preemptee1"},
ExpectEvictNum: 1,
},
{
// case about #3642
Name: "can not preempt resources when task preemption policy is never",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, nil, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, nil, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPodWithPreemptionPolicy("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string), v1.PreemptNever),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "1Gi", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectEvictNum: 0,
ExpectEvicted: []string{}, // no victims should be reclaimed
},
{
Name: "preempt low-priority pod when high-priority pod has PodAntiAffinity conflict",
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
util.BuildPodGroupWithPrio("pg3", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg2", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
buildPodWithPodAntiAffinity("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true", "test": "test"}, make(map[string]string), "kubernetes.io/hostname"),
buildPodWithPodAntiAffinity("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg3", map[string]string{"test": "test"}, make(map[string]string), "kubernetes.io/hostname"),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), map[string]string{"kubernetes.io/hostname": "n1"}),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, api.BuildResourceList("2", "2G")),
},
ExpectEvictNum: 1,
ExpectEvicted: []string{"c1/preemptee2"},
},
}
trueValue := true
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: conformance.PluginName,
EnabledPreemptable: &trueValue,
},
{
Name: gang.PluginName,
EnabledPreemptable: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledJobStarving: &trueValue,
},
{
Name: priority.PluginName,
EnabledTaskOrder: &trueValue,
EnabledJobOrder: &trueValue,
EnabledPreemptable: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledJobStarving: &trueValue,
},
{
Name: proportion.PluginName,
EnabledOverused: &trueValue,
EnabledAllocatable: &trueValue,
EnabledQueueOrder: &trueValue,
EnabledPredicate: &trueValue,
},
{
Name: predicates.PluginName,
EnabledPreemptable: &trueValue,
EnabledPredicate: &trueValue,
},
},
}}
actions := []framework.Action{New()}
for i, test := range tests {
test.Plugins = plugins
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
t.Run(test.Name, func(t *testing.T) {
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[0].Name(),
Arguments: map[string]interface{}{EnableTopologyAwarePreemptionKey: true}}})
defer test.Close()
test.Run(actions)
if err := test.CheckAll(i); err != nil {
t.Fatal(err)
}
})
}
}
func buildPodWithPodAntiAffinity(name, namespace, node string, phase v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string, topologyKey string) *v1.Pod {
pod := util.BuildPod(name, namespace, node, phase, req, groupName, labels, selector)
pod.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: topologyKey,
},
},
},
}
return pod
}

View File

@ -137,7 +137,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
assigned := false
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task)
totalNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(task)
for _, n := range totalNodes {
// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422

View File

@ -17,6 +17,8 @@ limitations under the License.
package api
import (
"context"
"errors"
"strings"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
@ -165,10 +167,46 @@ type Status struct {
}
// String represents status string
func (s Status) String() string {
func (s *Status) String() string {
return s.Reason
}
// IsSuccess returns true if and only if "Status" is nil or Code is "Success".
func (s *Status) IsSuccess() bool {
return s == nil || s.Code == Success
}
// IsWait returns true if and only if "Status" is nil or Code is "Wait".
func (s *Status) IsWait() bool {
return s.Code == Wait
}
// IsSkip returns true if and only if "Status" is nil or Code is "Skip".
func (s *Status) IsSkip() bool {
return s.Code == Skip
}
// AsError returns nil if the status is a success, a wait or a skip; otherwise returns an "error" object
// with a concatenated message on reasons of the Status.
func (s *Status) AsError() error {
if s.IsSuccess() || s.IsWait() || s.IsSkip() {
return nil
}
return errors.New(s.String())
}
// AsStatus wraps an error in a Status.
func AsStatus(err error) *Status {
if err == nil {
return nil
}
return &Status{
Code: Error,
Reason: err.Error(),
}
}
type StatusSets []*Status
func (s StatusSets) ContainsUnschedulable() bool {
@ -320,3 +358,17 @@ type VictimTasksFn func([]*TaskInfo) []*TaskInfo
// AllocatableFn is the func declaration used to check whether the task can be allocated
type AllocatableFn func(*QueueInfo, *TaskInfo) bool
// SimulateRemoveTaskFn is the func declaration used to simulate the result of removing a task from a node.
type SimulateRemoveTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToRemove *TaskInfo, nodeInfo *NodeInfo) error
// SimulateAddTaskFn is the func declaration used to simulate the result of adding a task to a node.
type SimulateAddTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToAdd *TaskInfo, nodeInfo *NodeInfo) error
// Simulate the predicate check for a task on a node.
// Plugins implement this function to verify if the task can be scheduled to the node while maintaining topology constraints
type SimulatePredicateFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) error
// Simulate the allocatable check for a node
// Plugins implement this function to verify if the queue has enough resources to schedule the task while maintaining topology constraints
type SimulateAllocatableFn func(ctx context.Context, state *k8sframework.CycleState, queue *QueueInfo, task *TaskInfo) bool

View File

@ -59,7 +59,6 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/apis/pkg/client/clientset/versioned/scheme"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
@ -71,7 +70,6 @@ import (
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/metrics/source"
"volcano.sh/volcano/pkg/scheduler/util"
commonutil "volcano.sh/volcano/pkg/util"
)
@ -262,6 +260,19 @@ func (de *defaultEvictor) Evict(p *v1.Pod, reason string) error {
klog.V(1).Infof("%+v", pod.Status.Conditions)
return nil
}
condition = &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
}
if !podutil.UpdatePodCondition(&pod.Status, condition) {
klog.V(1).Infof("UpdatePodCondition: existed condition, not update")
klog.V(1).Infof("%+v", pod.Status.Conditions)
return nil
}
if _, err := de.kubeclient.CoreV1().Pods(p.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update pod <%v/%v> status: %v", pod.Namespace, pod.Name, err)
return err
@ -345,9 +356,7 @@ func (su *defaultStatusUpdater) UpdateQueueStatus(queue *schedulingapi.QueueInfo
return err
}
queueStatusApply := v1beta1apply.QueueStatus().WithAllocated(newQueue.Status.Allocated)
queueApply := v1beta1apply.Queue(newQueue.Name).WithStatus(queueStatusApply)
_, err := su.vcclient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: util.DefaultComponentName})
_, err := su.vcclient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
return err

View File

@ -47,6 +47,9 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con
}
}
}
ssn.InitCycleState()
return ssn
}

View File

@ -38,7 +38,6 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
@ -110,17 +109,21 @@ type Session struct {
overusedFns map[string]api.ValidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
preemptiveFns map[string]api.ValidateWithCandidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
simulateRemoveTaskFns map[string]api.SimulateRemoveTaskFn
simulateAddTaskFns map[string]api.SimulateAddTaskFn
simulatePredicateFns map[string]api.SimulatePredicateFn
simulateAllocatableFns map[string]api.SimulateAllocatableFn
// cycleStatesMap is used to temporarily store the scheduling status of each pod, its life cycle is same as Session.
// Because state needs to be passed between different extension points (not only used in PreFilter and Filter),
@ -152,34 +155,38 @@ func openSession(cache cache.Cache) *Session {
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
victimQueueOrderFns: map[string]api.VictimCompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
hyperNodeOrderFns: map[string]api.HyperNodeOrderFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
victimQueueOrderFns: map[string]api.VictimCompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
hyperNodeOrderFns: map[string]api.HyperNodeOrderFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
simulateRemoveTaskFns: map[string]api.SimulateRemoveTaskFn{},
simulateAddTaskFns: map[string]api.SimulateAddTaskFn{},
simulatePredicateFns: map[string]api.SimulatePredicateFn{},
simulateAllocatableFns: map[string]api.SimulateAllocatableFn{},
}
snapshot := cache.Snapshot()
@ -204,11 +211,14 @@ func openSession(cache cache.Cache) *Session {
ssn.NamespaceInfo = snapshot.NamespaceInfo
// calculate all nodes' resource only once in each schedule cycle, other plugins can clone it when need
for _, n := range ssn.Nodes {
if isNodeUnschedulable(n.Node) || isNodeNotReady(n.Node) {
klog.V(3).Infof("node %s is not ready or unschedulable, need to continue", n.Name)
continue
}
ssn.TotalResource.Add(n.Allocatable)
}
ssn.InitCycleState()
klog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues",
ssn.UID, len(ssn.Jobs), len(ssn.Queues))
@ -317,9 +327,8 @@ func updateRootQueueResources(ssn *Session, allocated v1.ResourceList) {
}
if !equality.Semantic.DeepEqual(queue.Status.Allocated, allocated) {
queueStatusApply := v1beta1apply.QueueStatus().WithAllocated(allocated)
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
_, err = ssn.VCClient().SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: util.DefaultComponentName})
queue.Status.Allocated = allocated
_, err = ssn.VCClient().SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), queue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update root queue status: %s", err.Error())
return
@ -390,8 +399,8 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus {
return status
}
// GetUnschedulableAndUnresolvableNodesForTask filter out those node that has UnschedulableAndUnresolvable
func (ssn *Session) GetUnschedulableAndUnresolvableNodesForTask(task *api.TaskInfo) []*api.NodeInfo {
// FilterOutUnschedulableAndUnresolvableNodesForTask filter out those node that has UnschedulableAndUnresolvable
func (ssn *Session) FilterOutUnschedulableAndUnresolvableNodesForTask(task *api.TaskInfo) []*api.NodeInfo {
fitErrors, ok1 := ssn.Jobs[task.Job]
if !ok1 {
return ssn.NodeList

View File

@ -17,6 +17,8 @@ limitations under the License.
package framework
import (
"context"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/apis/pkg/apis/scheduling"
@ -160,6 +162,22 @@ func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) {
ssn.jobStarvingFns[name] = fn
}
func (ssn *Session) AddSimulateAddTaskFn(name string, fn api.SimulateAddTaskFn) {
ssn.simulateAddTaskFns[name] = fn
}
func (ssn *Session) AddSimulateRemoveTaskFn(name string, fn api.SimulateRemoveTaskFn) {
ssn.simulateRemoveTaskFns[name] = fn
}
func (ssn *Session) AddSimulateAllocatableFn(name string, fn api.SimulateAllocatableFn) {
ssn.simulateAllocatableFns[name] = fn
}
func (ssn *Session) AddSimulatePredicateFn(name string, fn api.SimulatePredicateFn) {
ssn.simulatePredicateFns[name] = fn
}
// Reclaimable invoke reclaimable function of the plugins
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
@ -670,6 +688,85 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
return nil
}
// SimulateAllocatableFn invoke simulateAllocatableFn function of the plugins
func (ssn *Session) SimulateAllocatableFn(ctx context.Context, state *k8sframework.CycleState, queue *api.QueueInfo, task *api.TaskInfo) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledAllocatable) {
continue
}
caf, found := ssn.simulateAllocatableFns[plugin.Name]
if !found {
continue
}
if !caf(ctx, state, queue, task) {
return false
}
}
}
return true
}
// SimulatePredicateFn invoke simulatePredicateFn function of the plugins
func (ssn *Session) SimulatePredicateFn(ctx context.Context, state *k8sframework.CycleState, task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicate) {
continue
}
pfn, found := ssn.simulatePredicateFns[plugin.Name]
if !found {
continue
}
err := pfn(ctx, state, task, node)
if err != nil {
return err
}
}
}
return nil
}
// SimulateRemoveTaskFn invoke simulateRemoveTaskFn function of the plugins
func (ssn *Session) SimulateRemoveTaskFn(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPreemptable) && !isEnabled(plugin.EnabledAllocatable) {
continue
}
pfn, found := ssn.simulateRemoveTaskFns[plugin.Name]
if !found {
continue
}
err := pfn(ctx, state, taskToSchedule, taskToRemove, nodeInfo)
if err != nil {
return err
}
}
}
return nil
}
// SimulateAddTaskFn invoke simulateAddTaskFn function of the plugins
func (ssn *Session) SimulateAddTaskFn(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPreemptable) && !isEnabled(plugin.EnabledAllocatable) {
continue
}
pfn, found := ssn.simulateAddTaskFns[plugin.Name]
if !found {
continue
}
err := pfn(ctx, state, taskToSchedule, taskToAdd, nodeInfo)
if err != nil {
return err
}
}
}
return nil
}
// PrePredicateFn invoke predicate function of the plugins
func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error {
for _, tier := range ssn.Tiers {

View File

@ -124,7 +124,7 @@ func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) {
}
// check potential nodes
potentialNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task)
potentialNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(task)
want := test.want[task.UID]
got := make([]string, 0, len(potentialNodes))
for _, node := range potentialNodes {

View File

@ -262,3 +262,22 @@ func (nl *NodeLister) List() ([]*v1.Node, error) {
}
return nodes, nil
}
func isNodeUnschedulable(node *v1.Node) bool {
if node == nil {
return true
}
return node.Spec.Unschedulable
}
func isNodeNotReady(node *v1.Node) bool {
if node == nil {
return true
}
for _, cond := range node.Status.Conditions {
if cond.Type == v1.NodeReady {
return cond.Status != v1.ConditionTrue
}
}
return true
}

View File

@ -0,0 +1,100 @@
package framework
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func makeNode(unschedulable bool, readyCondition v1.ConditionStatus, nodeIsNil bool) *v1.Node {
if nodeIsNil {
return nil
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Spec: v1.NodeSpec{
Unschedulable: unschedulable,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: readyCondition,
},
},
},
}
return node
}
func makeNodeWithoutReady() *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-no-ready",
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{},
},
}
}
func TestIsNodeUnschedulable(t *testing.T) {
tests := []struct {
name string
input bool
expected bool
nodeIsNil bool
}{
{"unschedulable true", true, true, false},
{"unschedulable false", false, false, false},
{"node is nil", true, true, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
node := makeNode(tt.input, v1.ConditionTrue, tt.nodeIsNil)
result := isNodeUnschedulable(node)
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
}
func TestIsNodeNotReady(t *testing.T) {
tests := []struct {
name string
ready v1.ConditionStatus
expected bool
nodeIsNil bool
}{
{"NodeReady=True", v1.ConditionTrue, false, false},
{"NodeReady=False", v1.ConditionFalse, true, false},
{"NodeReady=Unknown", v1.ConditionUnknown, true, false},
{"node is nil", "", true, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
node := makeNode(false, tt.ready, false)
result := isNodeNotReady(node)
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
t.Run("No NodeReady condition", func(t *testing.T) {
node := makeNodeWithoutReady()
result := isNodeNotReady(node)
if result != true {
t.Errorf("expected true when no NodeReady condition, got %v", result)
}
})
}

View File

@ -17,11 +17,13 @@ limitations under the License.
package capacity
import (
"context"
"fmt"
"math"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/apis/pkg/apis/scheduling"
@ -33,8 +35,12 @@ import (
)
const (
PluginName = "capacity"
rootQueueID = "root"
PluginName = "capacity"
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
capacityStateKey = PluginName
rootQueueID = "root"
)
type capacityPlugin struct {
@ -231,6 +237,96 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
return util.Permit
})
ssn.AddPrePredicateFn(cp.Name(), func(task *api.TaskInfo) error {
state := &capacityState{
queueAttrs: make(map[api.QueueID]*queueAttr),
}
for _, queue := range cp.queueOpts {
state.queueAttrs[queue.queueID] = queue.Clone()
}
ssn.GetCycleState(task.UID).Write(capacityStateKey, state)
return nil
})
ssn.AddSimulateAddTaskFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
state, err := getCapacityState(cycleState)
if err != nil {
return fmt.Errorf("failed to get capacity state: %w", err)
}
job := ssn.Jobs[taskToAdd.Job]
attr := state.queueAttrs[job.Queue]
if attr == nil {
return fmt.Errorf("queue %s not found", job.Queue)
}
attr.allocated.Add(taskToAdd.Resreq)
updateQueueAttrShare(attr)
if hierarchyEnabled {
for _, ancestorID := range attr.ancestors {
ancestorAttr := state.queueAttrs[ancestorID]
ancestorAttr.allocated.Add(taskToAdd.Resreq)
}
}
return nil
})
ssn.AddSimulateRemoveTaskFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
state, err := getCapacityState(cycleState)
if err != nil {
return fmt.Errorf("failed to get capacity state: %w", err)
}
job := ssn.Jobs[taskToRemove.Job]
attr := state.queueAttrs[job.Queue]
if attr == nil {
return fmt.Errorf("queue %s not found", job.Queue)
}
attr.allocated.Sub(taskToRemove.Resreq)
updateQueueAttrShare(attr)
if hierarchyEnabled {
for _, ancestorID := range attr.ancestors {
ancestorAttr := state.queueAttrs[ancestorID]
ancestorAttr.allocated.Sub(taskToRemove.Resreq)
}
}
return nil
})
ssn.AddSimulateAllocatableFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
state, err := getCapacityState(cycleState)
if err != nil {
return false
}
if !readyToSchedule {
klog.V(3).Infof("Capacity plugin failed to check queue's hierarchical structure!")
return false
}
if hierarchyEnabled && !cp.isLeafQueue(queue.UID) {
klog.V(3).Infof("Queue <%s> is not a leaf queue, can not allocate task <%s>.", queue.Name, candidate.Name)
return false
}
simulateQueueAllocatable := func(state *capacityState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
attr := state.queueAttrs[queue.UID]
return queueAllocatable(attr, candidate, queue)
}
list := append(state.queueAttrs[queue.UID].ancestors, queue.UID)
for i := len(list) - 1; i >= 0; i-- {
if !simulateQueueAllocatable(state, ssn.Queues[list[i]], candidate) {
if klog.V(5).Enabled() {
for i--; i >= 0; i-- {
simulateQueueAllocatable(state, ssn.Queues[list[i]], candidate)
}
}
return false
}
}
return true
})
// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
@ -315,6 +411,7 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
}
realCapability := api.ExceededPart(cp.totalResource, cp.totalGuarantee).Add(attr.guarantee)
if attr.capability == nil {
attr.capability = api.EmptyResource()
attr.realCapability = realCapability
} else {
realCapability.MinDimensionResource(attr.capability, api.Infinity)
@ -683,6 +780,7 @@ func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error {
for _, childAttr := range attr.children {
realCapability := api.ExceededPart(attr.realCapability, totalGuarantee).Add(childAttr.guarantee)
if childAttr.capability == nil {
childAttr.capability = api.EmptyResource()
childAttr.realCapability = realCapability
} else {
realCapability.MinDimensionResource(childAttr.capability, api.Infinity)
@ -716,13 +814,7 @@ func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error {
}
func (cp *capacityPlugin) updateShare(attr *queueAttr) {
res := float64(0)
for _, rn := range attr.deserved.ResourceNames() {
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)))
}
attr.share = res
updateQueueAttrShare(attr)
metrics.UpdateQueueShare(attr.name, attr.share)
}
@ -732,6 +824,10 @@ func (cp *capacityPlugin) isLeafQueue(queueID api.QueueID) bool {
func (cp *capacityPlugin) queueAllocatable(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
attr := cp.queueOpts[queue.UID]
return queueAllocatable(attr, candidate, queue)
}
func queueAllocatable(attr *queueAttr, candidate *api.TaskInfo, queue *api.QueueInfo) bool {
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq)
if !allocatable {
@ -805,3 +901,79 @@ func getQueueLevel(l *queueAttr, r *queueAttr) int {
return level
}
func getCapacityState(cycleState *k8sframework.CycleState) (*capacityState, error) {
c, err := cycleState.Read(capacityStateKey)
if err != nil {
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
return nil, fmt.Errorf("error reading %q from cycleState: %w", capacityStateKey, err)
}
s, ok := c.(*capacityState)
if !ok {
return nil, fmt.Errorf("%+v convert to capacity.state error", c)
}
return s, nil
}
type capacityState struct {
queueAttrs map[api.QueueID]*queueAttr
}
func (qa *queueAttr) Clone() *queueAttr {
if qa == nil {
return nil
}
cloned := &queueAttr{
queueID: qa.queueID,
name: qa.name,
share: qa.share,
deserved: qa.deserved.Clone(),
allocated: qa.allocated.Clone(),
request: qa.request.Clone(),
elastic: qa.elastic.Clone(),
inqueue: qa.inqueue.Clone(),
capability: qa.capability.Clone(),
realCapability: qa.realCapability.Clone(),
guarantee: qa.guarantee.Clone(),
children: make(map[api.QueueID]*queueAttr),
}
if len(qa.ancestors) > 0 {
cloned.ancestors = make([]api.QueueID, len(qa.ancestors))
copy(cloned.ancestors, qa.ancestors)
}
for childID, childNode := range qa.children {
cloned.children[childID] = childNode.Clone()
}
return cloned
}
func (s *capacityState) Clone() k8sframework.StateData {
if s == nil {
return nil
}
newState := &capacityState{
queueAttrs: make(map[api.QueueID]*queueAttr, len(s.queueAttrs)),
}
for qID, qa := range s.queueAttrs {
newState.queueAttrs[qID] = qa.Clone()
}
return newState
}
func updateQueueAttrShare(attr *queueAttr) {
res := float64(0)
for _, rn := range attr.deserved.ResourceNames() {
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)))
}
attr.share = res
}

View File

@ -71,3 +71,11 @@ type JobReadyRequest struct {
type JobReadyResponse struct {
Status bool `json:"status"`
}
type EventHandlerRequest struct {
Task *api.TaskInfo `json:"task"`
}
type EventHandlerResponse struct {
ErrorMessage string `json:"errorMessage"`
}

View File

@ -58,6 +58,10 @@ const (
ExtenderJobEnqueueableVerb = "extender.jobEnqueueableVerb"
// ExtenderJobReadyVerb is the verb of JobReady method
ExtenderJobReadyVerb = "extender.jobReadyVerb"
// ExtenderAllocateFuncVerb is the verb of AllocateFunc method
ExtenderAllocateFuncVerb = "extender.allocateFuncVerb"
// ExtenderDeallocateFuncVerb is the verb of DeallocateFunc method
ExtenderDeallocateFuncVerb = "extender.deallocateFuncVerb"
// ExtenderIgnorable indicates whether the extender can ignore unexpected errors
ExtenderIgnorable = "extender.ignorable"
@ -77,6 +81,8 @@ type extenderConfig struct {
queueOverusedVerb string
jobEnqueueableVerb string
jobReadyVerb string
allocateFuncVerb string
deallocateFuncVerb string
ignorable bool
}
@ -123,6 +129,8 @@ func parseExtenderConfig(arguments framework.Arguments) *extenderConfig {
ec.queueOverusedVerb, _ = arguments[ExtenderQueueOverusedVerb].(string)
ec.jobEnqueueableVerb, _ = arguments[ExtenderJobEnqueueableVerb].(string)
ec.jobReadyVerb, _ = arguments[ExtenderJobReadyVerb].(string)
ec.allocateFuncVerb, _ = arguments[ExtenderAllocateFuncVerb].(string)
ec.deallocateFuncVerb, _ = arguments[ExtenderDeallocateFuncVerb].(string)
arguments.GetBool(&ec.ignorable, ExtenderIgnorable)
@ -289,6 +297,8 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
return resp.Status
})
}
addEventHandler(ssn, ep)
}
func (ep *extenderPlugin) OnSessionClose(ssn *framework.Session) {
@ -330,3 +340,48 @@ func (ep *extenderPlugin) send(action string, args interface{}, result interface
}
return nil
}
func addEventHandler(ssn *framework.Session, ep *extenderPlugin) {
const (
AllocateFunc = "AllocateFunc"
DeallocateFunc = "DeallocateFunc"
)
eventHandlerFunc := func(funcName string) func(event *framework.Event) {
return func(event *framework.Event) {
if event == nil {
klog.Errorf("%s event nil.", funcName)
return
}
resp := &EventHandlerResponse{}
var verb string
switch funcName {
case AllocateFunc:
verb = ep.config.allocateFuncVerb
case DeallocateFunc:
verb = ep.config.deallocateFuncVerb
}
err := ep.send(verb, &EventHandlerRequest{Task: event.Task}, resp)
if err != nil {
klog.Warningf("%s failed with error %v", funcName, err)
if !ep.config.ignorable {
event.Err = err
}
}
if resp.ErrorMessage != "" {
event.Err = errors.New(resp.ErrorMessage)
}
}
}
var eventHandler framework.EventHandler
if ep.config.allocateFuncVerb != "" {
eventHandler.AllocateFunc = eventHandlerFunc(AllocateFunc)
}
if ep.config.deallocateFuncVerb != "" {
eventHandler.DeallocateFunc = eventHandlerFunc(DeallocateFunc)
}
ssn.AddEventHandler(&eventHandler)
}

View File

@ -434,6 +434,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
if predicate.podAffinityEnable {
klog.Infof("Executing podAffinityFilter PreFilter for task %s/%s", task.Namespace, task.Name)
_, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if err := handleSkipPrePredicatePlugin(status, state, task, interpodaffinity.Name); err != nil {
return err
@ -590,12 +591,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
podAffinityStatus := api.ConvertPredicateStatus(status)
if podAffinityStatus.Code != api.Success {
// TODO: Currently, preemption is not supported when Pod affinity filtering fails.
// Once supported, the logic here should be removed.
// See https://github.com/volcano-sh/volcano/issues/3845
podAffinityStatus.Code = api.UnschedulableAndUnresolvable
predicateStatus = append(predicateStatus, podAffinityStatus)
return api.NewFitErrWithStatus(task, node, predicateStatus...)
if ShouldAbort(podAffinityStatus) {
return api.NewFitErrWithStatus(task, node, predicateStatus...)
}
}
}
}
@ -711,6 +710,71 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
})
ssn.RegisterBinder(pp.Name(), pp)
// Add SimulateAddTask function
ssn.AddSimulateAddTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
podInfoToAdd, err := k8sframework.NewPodInfo(taskToAdd.Pod)
if err != nil {
return fmt.Errorf("failed to create pod info: %w", err)
}
k8sNodeInfo := k8sframework.NewNodeInfo(nodeInfo.Pods()...)
k8sNodeInfo.SetNode(nodeInfo.Node)
if predicate.podAffinityEnable {
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
if !isSkipInterPodAffinity {
status := podAffinityFilter.AddPod(ctx, cycleState, taskToSchedule.Pod, podInfoToAdd, k8sNodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("failed to remove pod from node %s: %w", nodeInfo.Name, status.AsError())
}
}
}
return nil
})
// Add SimulateRemoveTask function
ssn.AddSimulateRemoveTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
podInfoToRemove, err := k8sframework.NewPodInfo(taskToRemove.Pod)
if err != nil {
return fmt.Errorf("failed to create pod info: %w", err)
}
k8sNodeInfo := k8sframework.NewNodeInfo(nodeInfo.Pods()...)
k8sNodeInfo.SetNode(nodeInfo.Node)
if predicate.podAffinityEnable {
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
if !isSkipInterPodAffinity {
status := podAffinityFilter.RemovePod(ctx, cycleState, taskToSchedule.Pod, podInfoToRemove, k8sNodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("failed to remove pod from node %s: %w", nodeInfo.Name, status.AsError())
}
}
}
return nil
})
// Add SimulatePredicate function
ssn.AddSimulatePredicateFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, task *api.TaskInfo, node *api.NodeInfo) error {
k8sNodeInfo := k8sframework.NewNodeInfo(node.Pods()...)
k8sNodeInfo.SetNode(node.Node)
if predicate.podAffinityEnable {
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
if !isSkipInterPodAffinity {
status := podAffinityFilter.Filter(ctx, cycleState, task.Pod, k8sNodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("failed to filter pod on node %s: %w", node.Name, status.AsError())
} else {
klog.Infof("pod affinity for task %s/%s filter success on node %s", task.Namespace, task.Name, node.Name)
}
}
}
return nil
})
}
func (pp *predicatesPlugin) runReservePlugins(ssn *framework.Session, event *framework.Event) {

View File

@ -253,7 +253,8 @@ func TestPodAntiAffinity(t *testing.T) {
}
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
t.Run(test.Name, func(t *testing.T) {
test.RegisterSession(tiers, nil)
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[1].Name(),
Arguments: map[string]interface{}{preempt.EnableTopologyAwarePreemptionKey: true}}})
defer test.Close()
test.Run(actions)
if err := test.CheckAll(i); err != nil {

View File

@ -17,11 +17,14 @@ limitations under the License.
package proportion
import (
"context"
"fmt"
"math"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
@ -32,7 +35,10 @@ import (
)
// PluginName indicates name of volcano scheduler plugin.
const PluginName = "proportion"
const (
PluginName = "proportion"
proportionStateKey = "proportionState"
)
type proportionPlugin struct {
totalResource *api.Resource
@ -119,6 +125,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}
realCapability := api.ExceededPart(pp.totalResource, pp.totalGuarantee).Add(attr.guarantee)
if attr.capability == nil {
attr.capability = api.EmptyResource()
attr.realCapability = realCapability
} else {
realCapability.MinDimensionResource(attr.capability, api.Infinity)
@ -318,12 +325,49 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddAllocatableFn(pp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
return queueAllocatable(queue, candidate)
})
ssn.AddSimulateAllocatableFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
state, err := getProportionState(cycleState)
if err != nil {
klog.Errorf("getProportionState error: %v", err)
return false
}
attr := state.queueAttrs[queue.UID]
if attr == nil {
klog.Errorf("queue %v not found", queue.Name)
return false
}
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
allocatable := futureUsed.LessEqualWithDimension(attr.deserved, candidate.Resreq)
if !allocatable {
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>; Candidate <%v>: resource request <%v>",
queue.Name, attr.deserved, attr.allocated, candidate.Name, candidate.Resreq)
}
return allocatable
})
ssn.AddPreemptiveFn(pp.Name(), func(obj interface{}, candidate interface{}) bool {
queue := obj.(*api.QueueInfo)
task := candidate.(*api.TaskInfo)
return queueAllocatable(queue, task)
})
ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error {
state := &proportionState{
queueAttrs: make(map[api.QueueID]*queueAttr),
}
for _, queue := range pp.queueOpts {
state.queueAttrs[queue.queueID] = queue.Clone()
}
ssn.GetCycleState(task.UID).Write(proportionStateKey, state)
return nil
})
ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
queueID := job.Queue
@ -364,6 +408,38 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return util.Reject
})
ssn.AddSimulateAddTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
state, err := getProportionState(cycleState)
if err != nil {
return fmt.Errorf("failed to get capacity state: %w", err)
}
job := ssn.Jobs[taskToAdd.Job]
attr := state.queueAttrs[job.Queue]
if attr == nil {
return fmt.Errorf("queue %s not found", job.Queue)
}
attr.allocated.Add(taskToAdd.Resreq)
updateQueueAttrShare(attr)
return nil
})
ssn.AddSimulateRemoveTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
state, err := getProportionState(cycleState)
if err != nil {
return fmt.Errorf("failed to get capacity state: %w", err)
}
job := ssn.Jobs[taskToRemove.Job]
attr := state.queueAttrs[job.Queue]
if attr == nil {
return fmt.Errorf("queue %s not found", job.Queue)
}
attr.allocated.Sub(taskToRemove.Resreq)
updateQueueAttrShare(attr)
return nil
})
// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
@ -398,6 +474,69 @@ func (pp *proportionPlugin) OnSessionClose(ssn *framework.Session) {
}
func (pp *proportionPlugin) updateShare(attr *queueAttr) {
updateQueueAttrShare(attr)
metrics.UpdateQueueShare(attr.name, attr.share)
}
type proportionState struct {
queueAttrs map[api.QueueID]*queueAttr
}
func (qa *queueAttr) Clone() *queueAttr {
if qa == nil {
return nil
}
// Clone basic attributes
return &queueAttr{
queueID: qa.queueID,
name: qa.name,
weight: qa.weight,
share: qa.share,
deserved: qa.deserved.Clone(),
allocated: qa.allocated.Clone(),
request: qa.request.Clone(),
elastic: qa.elastic.Clone(),
inqueue: qa.inqueue.Clone(),
capability: qa.capability.Clone(),
realCapability: qa.realCapability.Clone(),
guarantee: qa.guarantee.Clone(),
}
}
func (s *proportionState) Clone() k8sframework.StateData {
if s == nil {
return nil
}
newState := &proportionState{
queueAttrs: make(map[api.QueueID]*queueAttr, len(s.queueAttrs)),
}
// Clone all queue attributes
for qID, qa := range s.queueAttrs {
newState.queueAttrs[qID] = qa.Clone()
}
return newState
}
func getProportionState(cycleState *k8sframework.CycleState) (*proportionState, error) {
c, err := cycleState.Read(proportionStateKey)
if err != nil {
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
return nil, fmt.Errorf("error reading %q from cycleState: %w", proportionStateKey, err)
}
s, ok := c.(*proportionState)
if !ok {
return nil, fmt.Errorf("%+v convert to proportion.state error", c)
}
return s, nil
}
func updateQueueAttrShare(attr *queueAttr) {
res := float64(0)
// TODO(k82cn): how to handle fragment issues?
@ -409,5 +548,4 @@ func (pp *proportionPlugin) updateShare(attr *queueAttr) {
}
attr.share = res
metrics.UpdateQueueShare(attr.name, attr.share)
}

View File

@ -43,6 +43,9 @@ func BuildNode(name string, alloc v1.ResourceList, labels map[string]string) *v1
Status: v1.NodeStatus{
Capacity: alloc,
Allocatable: alloc,
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
},
}
}

View File

@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
@ -456,4 +457,103 @@ var _ = Describe("Job E2E Test", func() {
Expect(err).NotTo(HaveOccurred())
})
It("should preempt low priority pod with anti-affinity constraint", func() {
// Remove enqueue action first because it conflicts with preempt.
cmc := e2eutil.NewConfigMapCase("volcano-system", "integration-scheduler-configmap")
modifier := func(sc *e2eutil.SchedulerConfiguration) bool {
newActions := strings.TrimPrefix(sc.Actions, "enqueue, ")
if newActions == sc.Actions {
klog.Warning("There is already no enqueue action")
return false
}
sc.Configurations = append(sc.Configurations, e2eutil.Configuration{
Name: "preempt",
Arguments: map[string]string{
"enableTopologyAwarePreemption": "true",
},
})
sc.Actions = newActions
return true
}
cmc.ChangeBy(func(data map[string]string) (changed bool, changedBefore map[string]string) {
return e2eutil.ModifySchedulerConfig(data, modifier)
})
defer cmc.UndoChanged()
ctx = e2eutil.InitTestContext(e2eutil.Options{
PriorityClasses: map[string]int32{
highPriority: highPriorityValue,
lowPriority: lowPriorityValue,
},
})
slot := e2eutil.OneCPU
rep := e2eutil.ClusterSize(ctx, slot)
// Create low priority pod with specific label
lowPriorityLabels := map[string]string{
"app": "test-app",
}
job := &e2eutil.JobSpec{
Tasks: []e2eutil.TaskSpec{
{
Img: e2eutil.DefaultNginxImage,
Req: slot,
Min: 1,
Rep: rep,
Labels: map[string]string{
schedulingv1beta1.PodPreemptable: "true",
"app": "test-app",
},
},
},
}
job.Name = "low-priority-job"
job.Pri = lowPriority
lowPriorityJob := e2eutil.CreateJob(ctx, job)
err := e2eutil.WaitTasksReady(ctx, lowPriorityJob, int(rep))
Expect(err).NotTo(HaveOccurred())
// Create high priority pod with anti-affinity rule
highPriorityJob := &e2eutil.JobSpec{
Tasks: []e2eutil.TaskSpec{
{
Img: e2eutil.DefaultNginxImage,
Req: slot,
Min: rep / 2,
Rep: rep,
Labels: map[string]string{
schedulingv1beta1.PodPreemptable: "true",
},
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: lowPriorityLabels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
},
},
}
highPriorityJob.Name = "high-priority-job"
highPriorityJob.Pri = highPriority
highPriorityJobCreated := e2eutil.CreateJob(ctx, highPriorityJob)
// Verify high priority pod is successfully scheduled
err = e2eutil.WaitTasksReady(ctx, highPriorityJobCreated, int(rep)/2)
Expect(err).NotTo(HaveOccurred())
// Verify low priority pod is preempted
err = e2eutil.WaitTasksReady(ctx, lowPriorityJob, int(rep)/2)
Expect(err).NotTo(HaveOccurred())
})
})