Compare commits
22 Commits
Author | SHA1 | Date |
---|---|---|
|
4ac9bc0ffd | |
|
1098a08122 | |
|
ddecb9a75a | |
|
aa439233cc | |
|
b858207728 | |
|
7c3720380e | |
|
7febf4dff7 | |
|
3c52b43750 | |
|
3aa260cf38 | |
|
e9040d33a3 | |
|
3506a7089b | |
|
6e2959db6b | |
|
a3d19d4bea | |
|
50895c2c36 | |
|
d940019b9c | |
|
88e22aab4e | |
|
c7efd55dd9 | |
|
b505c1b310 | |
|
adab22d06a | |
|
fb07f675fd | |
|
284eaed827 | |
|
57c2d11654 |
|
@ -11,7 +11,7 @@ jobs:
|
|||
e2e_scheduling_actions:
|
||||
runs-on: ubuntu-24.04
|
||||
name: E2E about Scheduling Actions
|
||||
timeout-minutes: 40
|
||||
timeout-minutes: 50
|
||||
steps:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
|
|
|
@ -11,7 +11,7 @@ jobs:
|
|||
e2e_scheduling_basic:
|
||||
runs-on: ubuntu-24.04
|
||||
name: E2E about Basic Scheduling
|
||||
timeout-minutes: 40
|
||||
timeout-minutes: 50
|
||||
steps:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
|
|
|
@ -11,7 +11,7 @@ jobs:
|
|||
e2e_sequence:
|
||||
runs-on: ubuntu-24.04
|
||||
name: E2E about Sequence
|
||||
timeout-minutes: 40
|
||||
timeout-minutes: 50
|
||||
steps:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
|
|
|
@ -11,7 +11,7 @@ jobs:
|
|||
e2e_vcctl:
|
||||
runs-on: ubuntu-24.04
|
||||
name: E2E about Volcano CLI
|
||||
timeout-minutes: 20
|
||||
timeout-minutes: 50
|
||||
steps:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
|
|
|
@ -189,6 +189,7 @@ Please follow the guide [Volcano Dashboard](https://github.com/volcano-sh/dashbo
|
|||
| Volcano v1.9 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |- |_ |- |
|
||||
| Volcano v1.10 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |_ |- |
|
||||
| Volcano v1.11 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |- |
|
||||
| Volcano v1.12 | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |✓ |
|
||||
| Volcano HEAD (master) | - | - | - | - | - | - | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |✓ |✓ |✓ |
|
||||
|
||||
Key:
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 87 KiB |
Binary file not shown.
After Width: | Height: | Size: 182 KiB |
|
@ -0,0 +1,98 @@
|
|||
# Preempt Action Support Topology
|
||||
|
||||
## Motivation
|
||||
|
||||
In cloud-native task scheduling scenarios, preemption is a key feature to ensure timely scheduling of high-priority tasks. Compared to the K8s scheduler, Volcano's current preemption implementation is relatively simple, especially in handling affinity judgments. To improve the accuracy and efficiency of the preemption mechanism, the existing implementation needs to be optimized, particularly in supporting topology awareness.
|
||||
|
||||
## In Scope
|
||||
|
||||
- Optimize Volcano's preemption mechanism to support affinity judgments
|
||||
- Improve single Pod preemption process
|
||||
- Implement simulation scheduling interface to ensure simulated addition and removal of pods won't cause topology changes
|
||||
|
||||
## Out of Scope
|
||||
|
||||
- Gang scheduling preemption scenario optimization
|
||||
|
||||
## User Stories
|
||||
|
||||
### Story 1
|
||||
|
||||
As a cluster administrator, I want the system to accurately judge Pod affinity constraints during preemption scheduling to avoid scheduling failures caused by topology changes.
|
||||
|
||||
### Story 2
|
||||
|
||||
As a user, I expect high-priority Pod preemption to minimize impact on existing Pods while maintaining consistency of affinity rules.
|
||||
|
||||
### Story 3
|
||||
|
||||
When topology-sensitive resources like GPUs exist, the preemption process needs to consider resource topology relationships to ensure resource allocation after preemption still satisfies original topology constraints.
|
||||
|
||||
For example, if a node has 2 GPUs (8GB each), Pod A and Pod B each use 4GB, and Pod C needs 8GB. When Pod C needs to be scheduled, it triggers the preemption mechanism. During the simulation scheduling process, the system will try to preempt Pod A and reschedule it. There are two possible scenarios:
|
||||
|
||||
1. If topology changes during simulation scheduling:
|
||||
|
||||
- System chooses to preempt Pod A
|
||||
- The predicate function check successfully for Pod C
|
||||
- When simulating the re-addition of Pod A, the binpack strategy causes Pod A to be scheduled to a different GPU
|
||||
- After re-adding Pod A, the predicate function check still passes for Pod C
|
||||
- This means Pod C can be scheduled without actually removing Pod A
|
||||
- Therefore, the preemption is considered unnecessary and fails
|
||||
2. If topology remains consistent during simulation scheduling:
|
||||
|
||||
- System chooses to preempt Pod A
|
||||
- The predicate function check successfully for Pod C
|
||||
- When simulating the re-addition of Pod A, the original topology relationship is maintained
|
||||
- After re-adding Pod A, the predicate function check fails for Pod C
|
||||
- This confirms that Pod A must be removed for Pod C to be scheduled
|
||||
- Therefore, the preemption is considered necessary and succeeds
|
||||
|
||||
Therefore, when implementing the preemption mechanism, it's crucial to verify the necessity of preemption by checking if the topology changes during pod re-addition would affect the scheduling of the preempting pod.
|
||||
|
||||

|
||||
|
||||
## Design Detail
|
||||
|
||||
### Preemption Process
|
||||
|
||||

|
||||
|
||||
1. Execute Predicate on all nodes that are not UnschedulableAndUnresolvable to obtain candidate node list, and perform parallel simulation scheduling on all candidate nodes.
|
||||
2. The simulation scheduling process for each node is as follows:
|
||||
|
||||
1. First consider Pods with lower priority as potential victims on the node
|
||||
2. Sort the victim list (lower priority and non-PDB-violating victims come first)
|
||||
3. Remove victims in order, add each removed one to eviction candidates, and observe if the verification function passes
|
||||
4. Verification function: Try to add pods (pipelined) with higher priority targeting the current node, observe if they can pass predicate; then remove them and observe if they can pass predicate
|
||||
5. If passed, try to add back the previous eviction candidates in PDB and priority order (to minimize impact), calling verification function after each addition; if verification fails, add to final eviction list
|
||||
6. If final eviction list is not empty, return it
|
||||
3. Sort filtered nodes using PreemptNodeOrderFn
|
||||
4. Schedule Pod to the top-ranked node, evict victims list, and cancel nominatedNodeName of lower priority pods that had nominated this node, moving them from pipeline to pending schedule
|
||||
|
||||
### Key Function Modifications
|
||||
|
||||
- `SimulateRemoveTaskFn`: Simulate the removal of a task from a node, plugins implement this function to ensure the removal action does not cause topology changes
|
||||
|
||||
```go
|
||||
type SimulateRemoveTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToRemove *TaskInfo, nodeInfo *NodeInfo) error
|
||||
```
|
||||
- `SimulateAddTaskFn`: Simulate the addition of a task to a node, plugins implement this function to ensure the addition action does not cause topology changes
|
||||
|
||||
```go
|
||||
type SimulateAddTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToAdd *TaskInfo, nodeInfo *NodeInfo) error
|
||||
```
|
||||
- `SimulatePredicateFn`: Simulate the predicate check for a task on a node, plugins implement this function to verify if the task can be scheduled to the node while maintaining topology constraints
|
||||
|
||||
```go
|
||||
type SimulatePredicateFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) error
|
||||
```
|
||||
- `SimulateAllocatableFn`: Simulate the allocatable check for a node, plugins implement this function to verify if the queue has enough resources to schedule the task while maintaining topology constraints
|
||||
|
||||
```go
|
||||
type SimulateAllocatableFn func(ctx context.Context, state *k8sframework.CycleState, queue *QueueInfo, task *TaskInfo) bool
|
||||
```
|
||||
|
||||
### Limitations
|
||||
|
||||
- Current design focuses on single pod preemption scenarios. Does not handle complex topology changes in gang scheduling
|
||||
- For complex combinations of affinity rules, multiple attempts may be needed to find the optimal solution. Performance impact of simulation scheduling needs to be evaluated in large-scale clusters
|
|
@ -1,25 +1,63 @@
|
|||
# Volcano vGPU User guide
|
||||
# Volcano vGPU User Guide
|
||||
|
||||
## Prerequisites
|
||||
## Background Knowledge of GPU Sharing Modes in Volcano
|
||||
|
||||
The list of prerequisites for running the Volcano device plugin is described below:
|
||||
* NVIDIA drivers > 440
|
||||
* nvidia-docker version > 2.0 (see how to [install](https://github.com/NVIDIA/nvidia-docker) and it's [prerequisites](https://github.com/nvidia/nvidia-docker/wiki/Installation-\(version-2.0\)#prerequisites))
|
||||
* docker configured with nvidia as the [default runtime](https://github.com/NVIDIA/nvidia-docker/wiki/Advanced-topics#default-runtime).
|
||||
* Kubernetes version >= 1.16
|
||||
* Volcano verison >= 1.9
|
||||
Volcano supports **two GPU sharing modes** for virtual GPU (vGPU) scheduling:
|
||||
|
||||
## Environment setup
|
||||
### 1. HAMI-core (Software-based vGPU)
|
||||
|
||||
### Install volcano
|
||||
**Description**:
|
||||
Leverages **VCUDA**, a CUDA API hijacking technique to enforce GPU core and memory usage limits, enabling **software-level virtual GPU slicing**.
|
||||
|
||||
Refer to [Install Guide](../../installer/README.md) to install volcano.
|
||||
**Use case**:
|
||||
Ideal for environments requiring **fine-grained GPU sharing**. Compatible with all GPU types.
|
||||
|
||||
After installed, update the scheduler configuration:
|
||||
---
|
||||
|
||||
```shell script
|
||||
kubectl edit cm -n volcano-system volcano-scheduler-configmap
|
||||
### 2. Dynamic MIG (Hardware-level GPU Slicing)
|
||||
|
||||
**Description**:
|
||||
Utilizes **NVIDIA's MIG (Multi-Instance GPU)** technology to partition a physical GPU into isolated instances with **hardware-level performance guarantees**.
|
||||
|
||||
**Use case**:
|
||||
Best for **performance-sensitive** workloads. Requires **MIG-capable GPUs** (e.g., A100, H100).
|
||||
|
||||
---
|
||||
|
||||
GPU Sharing mode is a node configuration. Volcano supports heterogeneous cluster(i.e a part of node uses HAMi-core while another part uses dynamic MIG), See [volcano-vgpu-device-plugin](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) for configuration and details.
|
||||
|
||||
## Installation
|
||||
|
||||
To enable vGPU scheduling, the following components must be set up based on the selected mode:
|
||||
|
||||
### Common Requirements
|
||||
|
||||
* **Prerequisites**:
|
||||
|
||||
* NVIDIA driver > 440
|
||||
* nvidia-docker > 2.0
|
||||
* Docker configured with `nvidia` as the default runtime
|
||||
* Kubernetes >= 1.16
|
||||
* Volcano >= 1.9
|
||||
|
||||
* **Install Volcano**:
|
||||
|
||||
* Follow instructions in [Volcano Installer Guide](https://github.com/volcano-sh/volcano?tab=readme-ov-file#quick-start-guide)
|
||||
|
||||
* **Install Device Plugin**:
|
||||
|
||||
* Deploy [`volcano-vgpu-device-plugin`](https://github.com/Project-HAMi/volcano-vgpu-device-plugin)
|
||||
|
||||
**Note:** the [vgpu device plugin yaml](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/volcano-vgpu-device-plugin.yml) also includes the ***Node GPU mode*** and the ***MIG geometry*** configuration. Please refer to the [vgpu device plugin config](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/doc/config.md).
|
||||
|
||||
* **Validate Setup**:
|
||||
Ensure node allocatable resources include:
|
||||
|
||||
```yaml
|
||||
volcano.sh/vgpu-memory: "89424"
|
||||
volcano.sh/vgpu-number: "8"
|
||||
```
|
||||
* **Scheduler Config Update**:
|
||||
|
||||
```yaml
|
||||
kind: ConfigMap
|
||||
|
@ -32,106 +70,121 @@ data:
|
|||
actions: "enqueue, allocate, backfill"
|
||||
tiers:
|
||||
- plugins:
|
||||
- name: priority
|
||||
- name: gang
|
||||
- name: conformance
|
||||
- plugins:
|
||||
- name: drf
|
||||
- name: predicates
|
||||
- name: deviceshare
|
||||
arguments:
|
||||
deviceshare.VGPUEnable: true # enable vgpu
|
||||
- name: predicates
|
||||
- name: proportion
|
||||
- name: nodeorder
|
||||
- name: binpack
|
||||
deviceshare.VGPUEnable: true # enable vgpu plugin
|
||||
deviceshare.SchedulePolicy: binpack # scheduling policy. binpack / spread
|
||||
```
|
||||
|
||||
### Install Volcano device plugin
|
||||
Check with:
|
||||
|
||||
Please refer to [volcano vgpu device plugin](https://github.com/Project-HAMi/volcano-vgpu-device-plugin?tab=readme-ov-file#enabling-gpu-support-in-kubernetes)
|
||||
|
||||
### Verify environment is ready
|
||||
|
||||
Check the node status, it is ok if `volcano.sh/vgpu-memory` and `volcano.sh/vgpu-number` are included in the allocatable resources.
|
||||
|
||||
```shell script
|
||||
$ kubectl get node {node name} -oyaml
|
||||
...
|
||||
status:
|
||||
addresses:
|
||||
- address: 172.17.0.3
|
||||
type: InternalIP
|
||||
- address: volcano-control-plane
|
||||
type: Hostname
|
||||
allocatable:
|
||||
cpu: "4"
|
||||
ephemeral-storage: 123722704Ki
|
||||
hugepages-1Gi: "0"
|
||||
hugepages-2Mi: "0"
|
||||
memory: 8174332Ki
|
||||
pods: "110"
|
||||
volcano.sh/vgpu-memory: "89424"
|
||||
volcano.sh/vgpu-number: "8" # GPU resource
|
||||
capacity:
|
||||
cpu: "4"
|
||||
ephemeral-storage: 123722704Ki
|
||||
hugepages-1Gi: "0"
|
||||
hugepages-2Mi: "0"
|
||||
memory: 8174332Ki
|
||||
pods: "110"
|
||||
volcano.sh/vgpu-memory: "89424"
|
||||
volcano.sh/vgpu-number: "8" # GPU resource
|
||||
```bash
|
||||
kubectl get node {node-name} -o yaml
|
||||
```
|
||||
|
||||
### Running GPU Sharing Jobs
|
||||
---
|
||||
|
||||
VGPU can be requested by both set "volcano.sh/vgpu-number" , "volcano.sh/vgpu-cores" and "volcano.sh/vgpu-memory" in resource.limit
|
||||
### HAMI-core Usage
|
||||
|
||||
```shell script
|
||||
$ cat <<EOF | kubectl apply -f -
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
* **Pod Spec**:
|
||||
|
||||
```yaml
|
||||
metadata:
|
||||
name: gpu-pod1
|
||||
name: hami-pod
|
||||
annotations:
|
||||
volcano.sh/vgpu-mode: "hami-core"
|
||||
spec:
|
||||
schedulerName: volcano
|
||||
containers:
|
||||
- name: cuda-container
|
||||
image: nvidia/cuda:9.0-devel
|
||||
command: ["sleep"]
|
||||
args: ["100000"]
|
||||
resources:
|
||||
limits:
|
||||
volcano.sh/vgpu-number: 2 # requesting 2 gpu cards
|
||||
volcano.sh/vgpu-memory: 3000 # (optinal)each vGPU uses 3G device memory
|
||||
volcano.sh/vgpu-cores: 50 # (optional)each vGPU uses 50% core
|
||||
EOF
|
||||
- name: cuda-container
|
||||
image: nvidia/cuda:9.0-devel
|
||||
resources:
|
||||
limits:
|
||||
volcano.sh/vgpu-number: 1 # requesting 1 gpu cards
|
||||
volcano.sh/vgpu-cores: 50 # (optional)each vGPU uses 50%
|
||||
volcano.sh/vgpu-memory: 3000 # (optional)each vGPU uses 3G GPU memory
|
||||
```
|
||||
|
||||
You can validate device memory using nvidia-smi inside container:
|
||||
---
|
||||
|
||||

|
||||
### Dynamic MIG Usage
|
||||
|
||||
> **WARNING:** *if you don't request GPUs when using the device plugin with NVIDIA images all
|
||||
> the GPUs on the machine will be exposed inside your container.
|
||||
> The number of vgpu used by a container can not exceed the number of gpus on that node.*
|
||||
* **Enable MIG Mode**:
|
||||
|
||||
### Monitor
|
||||
If you need to use MIG (Multi-Instance GPU), you must run the following command on the GPU node.
|
||||
|
||||
volcano-scheduler-metrics records every GPU usage and limitation, visit the following address to get these metrics.
|
||||
|
||||
```
|
||||
curl {volcano scheduler cluster ip}:8080/metrics
|
||||
```bash
|
||||
sudo nvidia-smi -mig 1
|
||||
```
|
||||
|
||||
You can also collect the **GPU utilization**, **GPU memory usage**, **pods' GPU memory limitations** and **pods' GPU memory usage** metrics on nodes by visiting the following addresses:
|
||||
* **Geometry Config (Optional)**:
|
||||
The volcano-vgpu-device-plugin automatically generates an initial MIG configuration, which is stored in the `volcano-vgpu-device-config` ConfigMap under the `kube-system` namespace. You can customize this configuration as needed. For more details, refer to the [vgpu device plugin yaml](https://github.com/Project-HAMi/volcano-vgpu-device-plugin/blob/main/volcano-vgpu-device-plugin.yml).
|
||||
|
||||
* **Pod Spec with MIG Annotation**:
|
||||
|
||||
```yaml
|
||||
metadata:
|
||||
name: mig-pod
|
||||
annotations:
|
||||
volcano.sh/vgpu-mode: "mig"
|
||||
spec:
|
||||
schedulerName: volcano
|
||||
containers:
|
||||
- name: cuda-container
|
||||
image: nvidia/cuda:9.0-devel
|
||||
resources:
|
||||
limits:
|
||||
volcano.sh/vgpu-number: 1
|
||||
volcano.sh/vgpu-memory: 3000
|
||||
```
|
||||
curl {volcano device plugin pod ip}:9394/metrics
|
||||
|
||||
Note: Actual memory allocated depends on best-fit MIG slice (e.g., request 3GB → 5GB slice used).
|
||||
|
||||
---
|
||||
|
||||
## Scheduler Mode Selection
|
||||
|
||||
* **Explicit Mode**:
|
||||
|
||||
* Use annotation `volcano.sh/vgpu-mode` to force hami-core or MIG mode.
|
||||
* If annotation is absent, scheduler selects mode based on resource fit and policy.
|
||||
|
||||
* **Scheduling Policy**:
|
||||
|
||||
* Modes like `binpack` or `spread` influence node selection.
|
||||
|
||||
---
|
||||
|
||||
## Summary Table
|
||||
|
||||
| Mode | Isolation | MIG GPU Required | Annotation | Core/Memory Control | Recommended For |
|
||||
| ----------- | ---------------- | ---------------- | ---------- | ------------------- | -------------------------- |
|
||||
| HAMI-core | Software (VCUDA) | No | No | Yes | General workloads |
|
||||
| Dynamic MIG | Hardware | Yes | Yes | MIG-controlled | Performance-sensitive jobs |
|
||||
|
||||
---
|
||||
|
||||
## Monitoring
|
||||
|
||||
* **Scheduler Metrics**:
|
||||
|
||||
```bash
|
||||
curl http://<volcano-scheduler-ip>:8080/metrics
|
||||
```
|
||||

|
||||
|
||||
# Issues and Contributing
|
||||
* **Device Plugin Metrics**:
|
||||
|
||||
```bash
|
||||
curl http://<plugin-pod-ip>:9394/metrics
|
||||
```
|
||||
|
||||
Metrics include GPU utilization, pod memory usage, and limits.
|
||||
|
||||
---
|
||||
|
||||
## Issues and Contributions
|
||||
|
||||
* File bugs: [Volcano Issues](https://github.com/volcano-sh/volcano/issues)
|
||||
* Contribute: [Pull Requests Guide](https://help.github.com/articles/using-pull-requests/)
|
||||
|
||||
* You can report a bug by [filing a new issue](https://github.com/volcano-sh/volcano/issues)
|
||||
* You can contribute by opening a [pull request](https://help.github.com/articles/using-pull-requests/)
|
2
go.mod
2
go.mod
|
@ -49,7 +49,7 @@ require (
|
|||
sigs.k8s.io/controller-runtime v0.13.0
|
||||
sigs.k8s.io/yaml v1.4.0
|
||||
stathat.com/c/consistent v1.0.0
|
||||
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f
|
||||
volcano.sh/apis v1.12.1
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
4
go.sum
4
go.sum
|
@ -528,5 +528,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
|
|||
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
|
||||
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
|
||||
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
|
||||
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f h1:3CLus5h9cv42O6VgbJxA2bFskauMQJKMxJhKTKSdGNc=
|
||||
volcano.sh/apis v1.11.1-0.20250526091449-59e96f6abe4f/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=
|
||||
volcano.sh/apis v1.12.1 h1:yq5dVj/g21vnWObCIKsJKPhMoThpzDrHDD/GMouYVxk=
|
||||
volcano.sh/apis v1.12.1/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=
|
||||
|
|
|
@ -156,31 +156,31 @@ case ${E2E_TYPE} in
|
|||
;;
|
||||
"JOBP")
|
||||
echo "Running parallel job e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --nodes=4 --compilers=4 --randomize-all --randomize-suites --fail-on-pending --cover --trace --race --slow-spec-threshold='30s' --progress ./test/e2e/jobp/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --nodes=4 --compilers=4 --randomize-all --randomize-suites --fail-on-pending --cover --trace --race --slow-spec-threshold='30s' --progress ./test/e2e/jobp/
|
||||
;;
|
||||
"JOBSEQ")
|
||||
echo "Running sequence job e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/jobseq/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/jobseq/
|
||||
;;
|
||||
"SCHEDULINGBASE")
|
||||
echo "Running scheduling base e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingbase/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingbase/
|
||||
;;
|
||||
"SCHEDULINGACTION")
|
||||
echo "Running scheduling action e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingaction/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/schedulingaction/
|
||||
;;
|
||||
"VCCTL")
|
||||
echo "Running vcctl e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/vcctl/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/vcctl/
|
||||
;;
|
||||
"STRESS")
|
||||
echo "Running stress e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress ./test/e2e/stress/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress ./test/e2e/stress/
|
||||
;;
|
||||
"DRA")
|
||||
echo "Running dra e2e suite..."
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -r --slow-spec-threshold='30s' --progress --focus="DRA E2E Test" ./test/e2e/dra/
|
||||
KUBECONFIG=${KUBECONFIG} GOOS=${OS} ginkgo -v -r --slow-spec-threshold='30s' --progress --focus="DRA E2E Test" ./test/e2e/dra/
|
||||
;;
|
||||
esac
|
||||
|
||||
|
|
|
@ -210,9 +210,6 @@ rules:
|
|||
- apiGroups: [""]
|
||||
resources: [ "nodes", "nodes/status" ]
|
||||
verbs: [ "get", "list", "watch", "update", "patch" ]
|
||||
- apiGroups: [ "" ]
|
||||
resources: [ "secrets" ]
|
||||
verbs: [ "get", "list", "watch" ]
|
||||
- apiGroups: [ "" ]
|
||||
resources: [ "configmaps" ]
|
||||
verbs: [ "get", "list", "watch", "create", "update" ]
|
||||
|
|
|
@ -91,7 +91,7 @@ rules:
|
|||
verbs: ["get", "list", "watch", "create", "delete", "update"]
|
||||
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
|
||||
resources: ["queues/status"]
|
||||
verbs: ["patch"]
|
||||
verbs: ["update"]
|
||||
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
|
||||
resources: ["podgroups"]
|
||||
verbs: ["list", "watch", "update"]
|
||||
|
|
|
@ -19,9 +19,6 @@ rules:
|
|||
- apiGroups: [""]
|
||||
resources: [ "nodes", "nodes/status" ]
|
||||
verbs: [ "get", "list", "watch", "update", "patch" ]
|
||||
- apiGroups: [ "" ]
|
||||
resources: [ "secrets" ]
|
||||
verbs: [ "get", "list", "watch" ]
|
||||
- apiGroups: [ "" ]
|
||||
resources: [ "configmaps" ]
|
||||
verbs: [ "get", "list", "watch", "create", "update" ]
|
||||
|
|
|
@ -4665,7 +4665,7 @@ rules:
|
|||
verbs: ["get", "list", "watch", "create", "delete", "update"]
|
||||
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
|
||||
resources: ["queues/status"]
|
||||
verbs: ["patch"]
|
||||
verbs: ["update"]
|
||||
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
|
||||
resources: ["podgroups"]
|
||||
verbs: ["list", "watch", "update"]
|
||||
|
|
|
@ -17,10 +17,23 @@ limitations under the License.
|
|||
package preempt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
k8sutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
|
||||
"volcano.sh/volcano/pkg/scheduler/api"
|
||||
"volcano.sh/volcano/pkg/scheduler/conf"
|
||||
|
@ -29,13 +42,37 @@ import (
|
|||
"volcano.sh/volcano/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
const (
|
||||
EnableTopologyAwarePreemptionKey = "enableTopologyAwarePreemption"
|
||||
|
||||
TopologyAwarePreemptWorkerNumKey = "topologyAwarePreemptWorkerNum"
|
||||
|
||||
MinCandidateNodesPercentageKey = "minCandidateNodesPercentage"
|
||||
MinCandidateNodesAbsoluteKey = "minCandidateNodesAbsolute"
|
||||
MaxCandidateNodesAbsoluteKey = "maxCandidateNodesAbsolute"
|
||||
)
|
||||
|
||||
type Action struct {
|
||||
ssn *framework.Session
|
||||
|
||||
enablePredicateErrorCache bool
|
||||
|
||||
enableTopologyAwarePreemption bool
|
||||
|
||||
topologyAwarePreemptWorkerNum int
|
||||
minCandidateNodesPercentage int
|
||||
minCandidateNodesAbsolute int
|
||||
maxCandidateNodesAbsolute int
|
||||
}
|
||||
|
||||
func New() *Action {
|
||||
return &Action{
|
||||
enablePredicateErrorCache: true,
|
||||
enablePredicateErrorCache: true,
|
||||
enableTopologyAwarePreemption: false,
|
||||
topologyAwarePreemptWorkerNum: 16,
|
||||
minCandidateNodesPercentage: 10,
|
||||
minCandidateNodesAbsolute: 1,
|
||||
maxCandidateNodesAbsolute: 100,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,6 +85,12 @@ func (pmpt *Action) Initialize() {}
|
|||
func (pmpt *Action) parseArguments(ssn *framework.Session) {
|
||||
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, pmpt.Name())
|
||||
arguments.GetBool(&pmpt.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
|
||||
arguments.GetBool(&pmpt.enableTopologyAwarePreemption, EnableTopologyAwarePreemptionKey)
|
||||
arguments.GetInt(&pmpt.topologyAwarePreemptWorkerNum, TopologyAwarePreemptWorkerNumKey)
|
||||
arguments.GetInt(&pmpt.minCandidateNodesPercentage, MinCandidateNodesPercentageKey)
|
||||
arguments.GetInt(&pmpt.minCandidateNodesAbsolute, MinCandidateNodesAbsoluteKey)
|
||||
arguments.GetInt(&pmpt.maxCandidateNodesAbsolute, MaxCandidateNodesAbsoluteKey)
|
||||
pmpt.ssn = ssn
|
||||
}
|
||||
|
||||
func (pmpt *Action) Execute(ssn *framework.Session) {
|
||||
|
@ -234,17 +277,28 @@ func (pmpt *Action) preempt(
|
|||
return false, err
|
||||
}
|
||||
|
||||
assigned := false
|
||||
|
||||
if err := ssn.PrePredicateFn(preemptor); err != nil {
|
||||
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
|
||||
}
|
||||
|
||||
predicateFn := ssn.PredicateForPreemptAction
|
||||
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
|
||||
allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor)
|
||||
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, pmpt.enablePredicateErrorCache)
|
||||
allNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(preemptor)
|
||||
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateForPreemptAction, pmpt.enablePredicateErrorCache)
|
||||
|
||||
if pmpt.enableTopologyAwarePreemption {
|
||||
return pmpt.topologyAwarePreempt(ssn, stmt, preemptor, filter, predicateNodes)
|
||||
}
|
||||
|
||||
return pmpt.normalPreempt(ssn, stmt, preemptor, filter, predicateNodes)
|
||||
}
|
||||
|
||||
func (pmpt *Action) normalPreempt(
|
||||
ssn *framework.Session,
|
||||
stmt *framework.Statement,
|
||||
preemptor *api.TaskInfo,
|
||||
filter func(*api.TaskInfo) bool,
|
||||
predicateNodes []*api.NodeInfo,
|
||||
) (bool, error) {
|
||||
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
|
||||
|
||||
selectedNodes := util.SortNodes(nodeScores)
|
||||
|
@ -256,6 +310,8 @@ func (pmpt *Action) preempt(
|
|||
|
||||
currentQueue := ssn.Queues[job.Queue]
|
||||
|
||||
assigned := false
|
||||
|
||||
for _, node := range selectedNodes {
|
||||
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
|
||||
preemptor.Namespace, preemptor.Name, node.Name)
|
||||
|
@ -340,5 +396,578 @@ func (pmpt *Action) taskEligibleToPreempt(preemptor *api.TaskInfo) error {
|
|||
return fmt.Errorf("not eligible to preempt other tasks due to preemptionPolicy is Never")
|
||||
}
|
||||
|
||||
nomNodeName := preemptor.Pod.Status.NominatedNodeName
|
||||
if len(nomNodeName) > 0 {
|
||||
nodeInfo, ok := pmpt.ssn.Nodes[nomNodeName]
|
||||
if !ok {
|
||||
return fmt.Errorf("not eligible due to the pod's nominated node is not found in the session")
|
||||
}
|
||||
|
||||
err := pmpt.ssn.PredicateFn(preemptor, nodeInfo)
|
||||
if err == nil {
|
||||
return fmt.Errorf("not eligible due to the pod's nominated node is already schedulable, which should not happen as preemption means no node is schedulable")
|
||||
}
|
||||
|
||||
fitError, ok := err.(*api.FitError)
|
||||
if !ok {
|
||||
return fmt.Errorf("not eligible due to the predicate returned a non-FitError error, the error is: %v", err)
|
||||
}
|
||||
|
||||
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the predicate,
|
||||
// then the pod should be considered for preempting again.
|
||||
if fitError.Status.ContainsUnschedulableAndUnresolvable() {
|
||||
return nil
|
||||
}
|
||||
|
||||
preemptorPodPriority := PodPriority(preemptor.Pod)
|
||||
for _, p := range nodeInfo.Pods() {
|
||||
if PodPriority(p) < preemptorPodPriority && podTerminatingByPreemption(p) {
|
||||
// There is a terminating pod on the nominated node.
|
||||
return fmt.Errorf("not eligible due to a terminating pod caused by preemption on the nominated node")
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pmpt *Action) topologyAwarePreempt(
|
||||
ssn *framework.Session,
|
||||
stmt *framework.Statement,
|
||||
preemptor *api.TaskInfo,
|
||||
filter func(*api.TaskInfo) bool,
|
||||
predicateNodes []*api.NodeInfo,
|
||||
) (bool, error) {
|
||||
// Find all preemption candidates.
|
||||
candidates, nodeToStatusMap, err := pmpt.findCandidates(preemptor, filter, predicateNodes, stmt)
|
||||
if err != nil && len(candidates) == 0 {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Return error when there are no candidates that fit the pod.
|
||||
if len(candidates) == 0 {
|
||||
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
|
||||
return false, fmt.Errorf("no candidates that fit the pod, the status of the nodes are %v", nodeToStatusMap)
|
||||
}
|
||||
|
||||
// Find the best candidate.
|
||||
bestCandidate := SelectCandidate(candidates)
|
||||
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
|
||||
return false, fmt.Errorf("no candidate node for preemption")
|
||||
}
|
||||
|
||||
if status := prepareCandidate(bestCandidate, preemptor.Pod, stmt, ssn); !status.IsSuccess() {
|
||||
return false, fmt.Errorf("failed to prepare candidate: %v", status)
|
||||
}
|
||||
|
||||
if err := stmt.Pipeline(preemptor, bestCandidate.Name(), true); err != nil {
|
||||
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
|
||||
preemptor.Namespace, preemptor.Name, bestCandidate.Name())
|
||||
if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
|
||||
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
|
||||
preemptor.UID, bestCandidate.Name(), ssn.UID, rollbackErr)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (pmpt *Action) findCandidates(preemptor *api.TaskInfo, filter func(*api.TaskInfo) bool, predicateNodes []*api.NodeInfo, stmt *framework.Statement) ([]*candidate, map[string]api.Status, error) {
|
||||
if len(predicateNodes) == 0 {
|
||||
klog.V(3).Infof("No nodes are eligible to preempt task %s/%s", preemptor.Namespace, preemptor.Name)
|
||||
return nil, nil, nil
|
||||
}
|
||||
klog.Infof("the predicateNodes number is %d", len(predicateNodes))
|
||||
|
||||
nodeToStatusMap := make(map[string]api.Status)
|
||||
|
||||
offset, numCandidates := pmpt.GetOffsetAndNumCandidates(len(predicateNodes))
|
||||
|
||||
candidates, nodeStatuses, err := pmpt.DryRunPreemption(preemptor, predicateNodes, offset, numCandidates, filter, stmt)
|
||||
for node, nodeStatus := range nodeStatuses {
|
||||
nodeToStatusMap[node] = nodeStatus
|
||||
}
|
||||
|
||||
return candidates, nodeToStatusMap, err
|
||||
}
|
||||
|
||||
// prepareCandidate evicts the victim pods before nominating the selected candidate
|
||||
func prepareCandidate(c *candidate, pod *v1.Pod, stmt *framework.Statement, ssn *framework.Session) *api.Status {
|
||||
for _, victim := range c.Victims() {
|
||||
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
|
||||
victim.Namespace, victim.Name, pod.Namespace, pod.Name)
|
||||
if err := stmt.Evict(victim, "preempt"); err != nil {
|
||||
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
|
||||
victim.Namespace, victim.Name, pod.Namespace, pod.Name, err)
|
||||
return api.AsStatus(err)
|
||||
}
|
||||
}
|
||||
|
||||
metrics.RegisterPreemptionAttempts()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// podTerminatingByPreemption returns true if the pod is in the termination state caused by preempt action.
|
||||
func podTerminatingByPreemption(p *v1.Pod) bool {
|
||||
if p.DeletionTimestamp == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, condition := range p.Status.Conditions {
|
||||
if condition.Type == v1.DisruptionTarget {
|
||||
return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// PodPriority returns priority of the given pod.
|
||||
func PodPriority(pod *v1.Pod) int32 {
|
||||
if pod.Spec.Priority != nil {
|
||||
return *pod.Spec.Priority
|
||||
}
|
||||
// When priority of a running pod is nil, it means it was created at a time
|
||||
// that there was no global default priority class and the priority class
|
||||
// name of the pod was empty. So, we resolve to the static default priority.
|
||||
return 0
|
||||
}
|
||||
|
||||
// calculateNumCandidates returns the number of candidates the FindCandidates
|
||||
// method must produce from dry running based on the constraints given by
|
||||
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
|
||||
// candidates returned will never be greater than <numNodes>.
|
||||
func (pmpt *Action) calculateNumCandidates(numNodes int) int {
|
||||
n := (numNodes * pmpt.minCandidateNodesPercentage) / 100
|
||||
|
||||
if n < pmpt.minCandidateNodesAbsolute {
|
||||
n = pmpt.minCandidateNodesAbsolute
|
||||
}
|
||||
|
||||
if n > pmpt.maxCandidateNodesAbsolute {
|
||||
n = pmpt.maxCandidateNodesAbsolute
|
||||
}
|
||||
|
||||
if n > numNodes {
|
||||
n = numNodes
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// offset is used to randomly select a starting point in the potentialNodes array.
|
||||
// This helps distribute the preemption checks across different nodes and avoid
|
||||
// always starting from the beginning of the node list, which could lead to
|
||||
// uneven distribution of preemption attempts.
|
||||
// GetOffsetAndNumCandidates chooses a random offset and calculates the number
|
||||
// of candidates that should be shortlisted for dry running preemption.
|
||||
func (pmpt *Action) GetOffsetAndNumCandidates(numNodes int) (int, int) {
|
||||
return rand.Intn(numNodes), pmpt.calculateNumCandidates(numNodes)
|
||||
}
|
||||
|
||||
func (pmpt *Action) DryRunPreemption(preemptor *api.TaskInfo, potentialNodes []*api.NodeInfo, offset, numCandidates int, filter func(*api.TaskInfo) bool, stmt *framework.Statement) ([]*candidate, map[string]api.Status, error) {
|
||||
candidates := newCandidateList(numCandidates)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
nodeStatuses := make(map[string]api.Status)
|
||||
var statusesLock sync.Mutex
|
||||
var errs []error
|
||||
|
||||
job, found := pmpt.ssn.Jobs[preemptor.Job]
|
||||
if !found {
|
||||
return nil, nil, fmt.Errorf("not found Job %s in Session", preemptor.Job)
|
||||
}
|
||||
|
||||
currentQueue := pmpt.ssn.Queues[job.Queue]
|
||||
|
||||
state := pmpt.ssn.GetCycleState(preemptor.UID)
|
||||
|
||||
checkNode := func(i int) {
|
||||
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
|
||||
stateCopy := state.Clone()
|
||||
|
||||
victims, status := SelectVictimsOnNode(ctx, stateCopy, preemptor, currentQueue, nodeInfoCopy, pmpt.ssn, filter, stmt)
|
||||
if status.IsSuccess() && len(victims) != 0 {
|
||||
c := &candidate{
|
||||
victims: victims,
|
||||
name: nodeInfoCopy.Name,
|
||||
}
|
||||
candidates.add(c)
|
||||
if candidates.size() >= numCandidates {
|
||||
cancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
if status.IsSuccess() && len(victims) == 0 {
|
||||
status = api.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Name))
|
||||
}
|
||||
statusesLock.Lock()
|
||||
if status.Code == api.Error {
|
||||
errs = append(errs, status.AsError())
|
||||
}
|
||||
nodeStatuses[nodeInfoCopy.Name] = *status
|
||||
statusesLock.Unlock()
|
||||
}
|
||||
|
||||
workqueue.ParallelizeUntil(ctx, pmpt.topologyAwarePreemptWorkerNum, len(potentialNodes), checkNode)
|
||||
return candidates.get(), nodeStatuses, utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
type candidate struct {
|
||||
victims []*api.TaskInfo
|
||||
name string
|
||||
}
|
||||
|
||||
// Victims returns s.victims.
|
||||
func (s *candidate) Victims() []*api.TaskInfo {
|
||||
return s.victims
|
||||
}
|
||||
|
||||
// Name returns s.name.
|
||||
func (s *candidate) Name() string {
|
||||
return s.name
|
||||
}
|
||||
|
||||
type candidateList struct {
|
||||
idx int32
|
||||
items []*candidate
|
||||
}
|
||||
|
||||
func newCandidateList(size int) *candidateList {
|
||||
return &candidateList{idx: -1, items: make([]*candidate, size)}
|
||||
}
|
||||
|
||||
// add adds a new candidate to the internal array atomically.
|
||||
func (cl *candidateList) add(c *candidate) {
|
||||
if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) {
|
||||
cl.items[idx] = c
|
||||
}
|
||||
}
|
||||
|
||||
// size returns the number of candidate stored. Note that some add() operations
|
||||
// might still be executing when this is called, so care must be taken to
|
||||
// ensure that all add() operations complete before accessing the elements of
|
||||
// the list.
|
||||
func (cl *candidateList) size() int {
|
||||
n := int(atomic.LoadInt32(&cl.idx) + 1)
|
||||
if n >= len(cl.items) {
|
||||
n = len(cl.items)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// get returns the internal candidate array. This function is NOT atomic and
|
||||
// assumes that all add() operations have been completed.
|
||||
func (cl *candidateList) get() []*candidate {
|
||||
return cl.items[:cl.size()]
|
||||
}
|
||||
|
||||
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
|
||||
// for "pod" to be scheduled.
|
||||
func SelectVictimsOnNode(
|
||||
ctx context.Context,
|
||||
state *k8sframework.CycleState,
|
||||
preemptor *api.TaskInfo,
|
||||
currentQueue *api.QueueInfo,
|
||||
nodeInfo *api.NodeInfo,
|
||||
ssn *framework.Session,
|
||||
filter func(*api.TaskInfo) bool,
|
||||
stmt *framework.Statement,
|
||||
) ([]*api.TaskInfo, *api.Status) {
|
||||
var potentialVictims []*api.TaskInfo
|
||||
|
||||
removeTask := func(rti *api.TaskInfo) error {
|
||||
err := ssn.SimulateRemoveTaskFn(ctx, state, preemptor, rti, nodeInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := nodeInfo.RemoveTask(rti); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
addTask := func(ati *api.TaskInfo) error {
|
||||
err := ssn.SimulateAddTaskFn(ctx, state, preemptor, ati, nodeInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := nodeInfo.AddTask(ati); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var preemptees []*api.TaskInfo
|
||||
for _, task := range nodeInfo.Tasks {
|
||||
if filter == nil {
|
||||
preemptees = append(preemptees, task.Clone())
|
||||
} else if filter(task) {
|
||||
preemptees = append(preemptees, task.Clone())
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(3).Infof("all preemptees: %v", preemptees)
|
||||
|
||||
allVictims := ssn.Preemptable(preemptor, preemptees)
|
||||
metrics.UpdatePreemptionVictimsCount(len(allVictims))
|
||||
|
||||
if err := util.ValidateVictims(preemptor, nodeInfo, allVictims); err != nil {
|
||||
klog.V(3).Infof("No validated victims on Node <%s>: %v", nodeInfo.Name, err)
|
||||
return nil, api.AsStatus(fmt.Errorf("no validated victims on Node <%s>: %v", nodeInfo.Name, err))
|
||||
}
|
||||
|
||||
klog.V(3).Infof("allVictims: %v", allVictims)
|
||||
|
||||
// Sort potentialVictims by pod priority from high to low, which ensures to
|
||||
// reprieve higher priority pods first.
|
||||
sort.Slice(allVictims, func(i, j int) bool { return k8sutil.MoreImportantPod(allVictims[i].Pod, allVictims[j].Pod) })
|
||||
|
||||
victimsQueue := ssn.BuildVictimsPriorityQueue(allVictims, preemptor)
|
||||
|
||||
for !victimsQueue.Empty() {
|
||||
task := victimsQueue.Pop().(*api.TaskInfo)
|
||||
potentialVictims = append(potentialVictims, task)
|
||||
if err := removeTask(task); err != nil {
|
||||
return nil, api.AsStatus(err)
|
||||
}
|
||||
|
||||
if ssn.SimulateAllocatableFn(ctx, state, currentQueue, preemptor) && preemptor.InitResreq.LessEqual(nodeInfo.FutureIdle(), api.Zero) {
|
||||
if err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo); err == nil {
|
||||
klog.V(3).Infof("Pod %v/%v can be scheduled on node %v after preempt %v/%v, stop evicting more pods", preemptor.Namespace, preemptor.Name, nodeInfo.Name, task.Namespace, task.Name)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
|
||||
if len(potentialVictims) == 0 {
|
||||
return nil, api.AsStatus(fmt.Errorf("no preemption victims found for incoming pod"))
|
||||
}
|
||||
|
||||
// If the new pod does not fit after removing all potential victim pods,
|
||||
// we are almost done and this node is not suitable for preemption. The only
|
||||
// condition that we could check is if the "pod" is failing to schedule due to
|
||||
// inter-pod affinity to one or more victims, but we have decided not to
|
||||
// support this case for performance reasons. Having affinity to lower
|
||||
// priority pods is not a recommended configuration anyway.
|
||||
if err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo); err != nil {
|
||||
return nil, api.AsStatus(fmt.Errorf("failed to predicate pod %s/%s on node %s: %v", preemptor.Namespace, preemptor.Name, nodeInfo.Name, err))
|
||||
}
|
||||
|
||||
var victims []*api.TaskInfo
|
||||
|
||||
klog.V(3).Infof("potentialVictims---: %v, nodeInfo: %v", potentialVictims, nodeInfo.Name)
|
||||
|
||||
// TODO: consider the PDB violation here
|
||||
|
||||
reprievePod := func(pi *api.TaskInfo) (bool, error) {
|
||||
if err := addTask(pi); err != nil {
|
||||
klog.ErrorS(err, "Failed to add task", "task", klog.KObj(pi.Pod))
|
||||
return false, err
|
||||
}
|
||||
|
||||
var fits bool
|
||||
if ssn.SimulateAllocatableFn(ctx, state, currentQueue, preemptor) && preemptor.InitResreq.LessEqual(nodeInfo.FutureIdle(), api.Zero) {
|
||||
err := ssn.SimulatePredicateFn(ctx, state, preemptor, nodeInfo)
|
||||
fits = err == nil
|
||||
}
|
||||
|
||||
if !fits {
|
||||
if err := removeTask(pi); err != nil {
|
||||
return false, err
|
||||
}
|
||||
victims = append(victims, pi)
|
||||
klog.V(3).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node))
|
||||
}
|
||||
klog.Infof("reprievePod for task: %v, fits: %v", pi.Name, fits)
|
||||
return fits, nil
|
||||
}
|
||||
|
||||
// Now we try to reprieve non-violating victims.
|
||||
for _, p := range potentialVictims {
|
||||
if _, err := reprievePod(p); err != nil {
|
||||
return nil, api.AsStatus(err)
|
||||
}
|
||||
}
|
||||
|
||||
klog.Infof("victims: %v", victims)
|
||||
|
||||
return victims, &api.Status{
|
||||
Reason: "",
|
||||
}
|
||||
}
|
||||
|
||||
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
|
||||
// NOTE: This method is exported for easier testing in default preemption.
|
||||
func SelectCandidate(candidates []*candidate) *candidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(candidates) == 1 {
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
victimsMap := CandidatesToVictimsMap(candidates)
|
||||
scoreFuncs := OrderedScoreFuncs(victimsMap)
|
||||
candidateNode := pickOneNodeForPreemption(victimsMap, scoreFuncs)
|
||||
|
||||
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
|
||||
// preemption plugins that exercise different candidates on the same nominated node.
|
||||
if victims := victimsMap[candidateNode]; victims != nil {
|
||||
return &candidate{
|
||||
victims: victims,
|
||||
name: candidateNode,
|
||||
}
|
||||
}
|
||||
|
||||
// We shouldn't reach here.
|
||||
klog.Error(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
|
||||
// To not break the whole flow, return the first candidate.
|
||||
return candidates[0]
|
||||
}
|
||||
|
||||
func CandidatesToVictimsMap(candidates []*candidate) map[string][]*api.TaskInfo {
|
||||
m := make(map[string][]*api.TaskInfo, len(candidates))
|
||||
for _, c := range candidates {
|
||||
m[c.Name()] = c.Victims()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// TODO: Consider exposing score functions to plugins in the future
|
||||
func OrderedScoreFuncs(nodesToVictims map[string][]*api.TaskInfo) []func(node string) int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// pickOneNodeForPreemption chooses one node among the given nodes.
|
||||
// It assumes pods in each map entry are ordered by decreasing priority.
|
||||
// If the scoreFuncs is not empty, It picks a node based on score scoreFuncs returns.
|
||||
// If the scoreFuncs is empty,
|
||||
// It picks a node based on the following criteria:
|
||||
// 1. A node with minimum number of PDB violations.
|
||||
// 2. A node with minimum highest priority victim is picked.
|
||||
// 3. Ties are broken by sum of priorities of all victims.
|
||||
// 4. If there are still ties, node with the minimum number of victims is picked.
|
||||
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
||||
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||
// allocation and garbage collection time.
|
||||
func pickOneNodeForPreemption(nodesToVictims map[string][]*api.TaskInfo, scoreFuncs []func(node string) int64) string {
|
||||
if len(nodesToVictims) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
allCandidates := make([]string, 0, len(nodesToVictims))
|
||||
for node := range nodesToVictims {
|
||||
allCandidates = append(allCandidates, node)
|
||||
}
|
||||
|
||||
if len(scoreFuncs) == 0 {
|
||||
minHighestPriorityScoreFunc := func(node string) int64 {
|
||||
// highestPodPriority is the highest priority among the victims on this node.
|
||||
highestPodPriority := PodPriority(nodesToVictims[node][0].Pod)
|
||||
// The smaller the highestPodPriority, the higher the score.
|
||||
return -int64(highestPodPriority)
|
||||
}
|
||||
minSumPrioritiesScoreFunc := func(node string) int64 {
|
||||
var sumPriorities int64
|
||||
for _, task := range nodesToVictims[node] {
|
||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||
// needed so that a node with a few pods with negative priority is not
|
||||
// picked over a node with a smaller number of pods with the same negative
|
||||
// priority (and similar scenarios).
|
||||
sumPriorities += int64(PodPriority(task.Pod)) + int64(math.MaxInt32+1)
|
||||
}
|
||||
// The smaller the sumPriorities, the higher the score.
|
||||
return -sumPriorities
|
||||
}
|
||||
minNumPodsScoreFunc := func(node string) int64 {
|
||||
// The smaller the length of pods, the higher the score.
|
||||
return -int64(len(nodesToVictims[node]))
|
||||
}
|
||||
latestStartTimeScoreFunc := func(node string) int64 {
|
||||
// Get the earliest start time of all pods on the current node.
|
||||
earliestStartTimeOnNode := GetEarliestPodStartTime(nodesToVictims[node])
|
||||
if earliestStartTimeOnNode == nil {
|
||||
klog.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
|
||||
return int64(math.MinInt64)
|
||||
}
|
||||
// The bigger the earliestStartTimeOnNode, the higher the score.
|
||||
return earliestStartTimeOnNode.UnixNano()
|
||||
}
|
||||
|
||||
// Each scoreFunc scores the nodes according to specific rules and keeps the name of the node
|
||||
// with the highest score. If and only if the scoreFunc has more than one node with the highest
|
||||
// score, we will execute the other scoreFunc in order of precedence.
|
||||
scoreFuncs = []func(string) int64{
|
||||
// A node with a minimum highest priority victim is preferable.
|
||||
minHighestPriorityScoreFunc,
|
||||
// A node with the smallest sum of priorities is preferable.
|
||||
minSumPrioritiesScoreFunc,
|
||||
// A node with the minimum number of pods is preferable.
|
||||
minNumPodsScoreFunc,
|
||||
// A node with the latest start time of all highest priority victims is preferable.
|
||||
latestStartTimeScoreFunc,
|
||||
// If there are still ties, then the first Node in the list is selected.
|
||||
}
|
||||
}
|
||||
|
||||
for _, f := range scoreFuncs {
|
||||
selectedNodes := []string{}
|
||||
maxScore := int64(math.MinInt64)
|
||||
for _, node := range allCandidates {
|
||||
score := f(node)
|
||||
if score > maxScore {
|
||||
maxScore = score
|
||||
selectedNodes = []string{}
|
||||
}
|
||||
if score == maxScore {
|
||||
selectedNodes = append(selectedNodes, node)
|
||||
}
|
||||
}
|
||||
if len(selectedNodes) == 1 {
|
||||
return selectedNodes[0]
|
||||
}
|
||||
allCandidates = selectedNodes
|
||||
}
|
||||
|
||||
return allCandidates[0]
|
||||
}
|
||||
|
||||
// GetEarliestPodStartTime returns the earliest start time of all pods that
|
||||
// have the highest priority among all victims.
|
||||
func GetEarliestPodStartTime(tasks []*api.TaskInfo) *metav1.Time {
|
||||
if len(tasks) == 0 {
|
||||
// should not reach here.
|
||||
klog.Background().Error(nil, "victims.Pods is empty. Should not reach here")
|
||||
return nil
|
||||
}
|
||||
|
||||
earliestPodStartTime := GetPodStartTime(tasks[0].Pod)
|
||||
maxPriority := PodPriority(tasks[0].Pod)
|
||||
|
||||
for _, task := range tasks {
|
||||
if podPriority := PodPriority(task.Pod); podPriority == maxPriority {
|
||||
if podStartTime := GetPodStartTime(task.Pod); podStartTime.Before(earliestPodStartTime) {
|
||||
earliestPodStartTime = podStartTime
|
||||
}
|
||||
} else if podPriority > maxPriority {
|
||||
maxPriority = podPriority
|
||||
earliestPodStartTime = GetPodStartTime(task.Pod)
|
||||
}
|
||||
}
|
||||
|
||||
return earliestPodStartTime
|
||||
}
|
||||
|
||||
// GetPodStartTime returns start time of the given pod or current timestamp
|
||||
// if it hasn't started yet.
|
||||
func GetPodStartTime(pod *v1.Pod) *metav1.Time {
|
||||
if pod.Status.StartTime != nil {
|
||||
return pod.Status.StartTime
|
||||
}
|
||||
// Assumed pods and bound pods that haven't started don't have a StartTime yet.
|
||||
return &metav1.Time{Time: time.Now()}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,13 @@ limitations under the License.
|
|||
package preempt
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
|
||||
"volcano.sh/volcano/cmd/scheduler/app/options"
|
||||
|
@ -29,12 +32,19 @@ import (
|
|||
"volcano.sh/volcano/pkg/scheduler/framework"
|
||||
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
|
||||
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
|
||||
"volcano.sh/volcano/pkg/scheduler/plugins/predicates"
|
||||
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
|
||||
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
|
||||
"volcano.sh/volcano/pkg/scheduler/uthelper"
|
||||
"volcano.sh/volcano/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
func init() {
|
||||
options.Default()
|
||||
klog.InitFlags(nil)
|
||||
flag.Set("v", "4")
|
||||
}
|
||||
|
||||
func TestPreempt(t *testing.T) {
|
||||
plugins := map[string]framework.PluginBuilder{
|
||||
conformance.PluginName: conformance.New,
|
||||
|
@ -44,7 +54,6 @@ func TestPreempt(t *testing.T) {
|
|||
}
|
||||
highPrio := util.BuildPriorityClass("high-priority", 100000)
|
||||
lowPrio := util.BuildPriorityClass("low-priority", 10)
|
||||
options.Default()
|
||||
|
||||
tests := []uthelper.TestCommonStruct{
|
||||
{
|
||||
|
@ -173,7 +182,7 @@ func TestPreempt(t *testing.T) {
|
|||
},
|
||||
{
|
||||
// case about issue #2232
|
||||
Name: "preempt low priority job in same queue",
|
||||
Name: "preempt low priority job in same queue but not pod with preemptable=false or higher priority",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
|
@ -291,7 +300,8 @@ func TestPreempt(t *testing.T) {
|
|||
test.Plugins = plugins
|
||||
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
test.RegisterSession(tiers, nil)
|
||||
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[0].Name(),
|
||||
Arguments: map[string]interface{}{EnableTopologyAwarePreemptionKey: false}}})
|
||||
defer test.Close()
|
||||
test.Run(actions)
|
||||
if err := test.CheckAll(i); err != nil {
|
||||
|
@ -300,3 +310,317 @@ func TestPreempt(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopologyAwarePreempt(t *testing.T) {
|
||||
plugins := map[string]framework.PluginBuilder{
|
||||
conformance.PluginName: conformance.New,
|
||||
gang.PluginName: gang.New,
|
||||
priority.PluginName: priority.New,
|
||||
proportion.PluginName: proportion.New,
|
||||
predicates.PluginName: predicates.New,
|
||||
}
|
||||
highPrio := util.BuildPriorityClass("high-priority", 100000)
|
||||
lowPrio := util.BuildPriorityClass("low-priority", 10)
|
||||
|
||||
tests := []uthelper.TestCommonStruct{
|
||||
{
|
||||
Name: "do not preempt if there are enough idle resources",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroup("pg1", "c1", "q1", 3, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
// If there are enough idle resources on the node, then there is no need to preempt anything.
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, nil),
|
||||
},
|
||||
ExpectEvictNum: 0,
|
||||
},
|
||||
{
|
||||
Name: "do not preempt if job is pipelined",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroup("pg1", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue),
|
||||
util.BuildPodGroup("pg2", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue),
|
||||
},
|
||||
// Both pg1 and pg2 jobs are pipelined, because enough pods are already running.
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
// All resources on the node will be in use.
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("3", "3G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, nil),
|
||||
},
|
||||
ExpectEvictNum: 0,
|
||||
},
|
||||
{
|
||||
Name: "preempt one task of different job to fit both jobs on one node",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 2}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("2", "2G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, nil),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee1"},
|
||||
ExpectEvictNum: 1,
|
||||
},
|
||||
{
|
||||
Name: "preempt enough tasks to fit large task of different job",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
// There are 3 cpus and 3G of memory idle and 3 tasks running each consuming 1 cpu and 1G of memory.
|
||||
// Big task requiring 5 cpus and 5G of memory should preempt 2 of 3 running tasks to fit into the node.
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("5", "5G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("6", "6G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, nil),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee2", "c1/preemptee1"},
|
||||
ExpectEvictNum: 2,
|
||||
},
|
||||
{
|
||||
// case about #3161
|
||||
Name: "preempt low priority job in same queue",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("3", "3G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("4", "4G")),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee1"},
|
||||
ExpectEvictNum: 1,
|
||||
},
|
||||
{
|
||||
// case about #3161
|
||||
Name: "preempt low priority job in same queue: allocatable and has enough resource, don't preempt",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("3", "3G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
|
||||
},
|
||||
ExpectEvictNum: 0,
|
||||
},
|
||||
{
|
||||
// case about issue #2232
|
||||
Name: "preempt low priority job in same queue but not pod with preemptable=false or higher priority",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 1, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPodWithPriority("c1", "preemptee3", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string), &highPrio.Value),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("3", "3G")),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee2"},
|
||||
ExpectEvictNum: 1,
|
||||
},
|
||||
{
|
||||
// case about #3335
|
||||
Name: "unBestEffort high-priority pod preempt BestEffort low-priority pod in same queue",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee1"},
|
||||
ExpectEvictNum: 1,
|
||||
},
|
||||
{
|
||||
// case about #3335
|
||||
Name: "BestEffort high-priority pod preempt BestEffort low-priority pod in same queue",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPod("c1", "preemptor1", "", v1.PodPending, v1.ResourceList{}, "pg2", make(map[string]string), make(map[string]string)),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")),
|
||||
},
|
||||
ExpectEvicted: []string{"c1/preemptee1"},
|
||||
ExpectEvictNum: 1,
|
||||
},
|
||||
{
|
||||
// case about #3642
|
||||
Name: "can not preempt resources when task preemption policy is never",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, nil, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, nil, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
util.BuildPodWithPreemptionPolicy("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string), v1.PreemptNever),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("1", "1Gi", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, nil),
|
||||
},
|
||||
ExpectEvictNum: 0,
|
||||
ExpectEvicted: []string{}, // no victims should be reclaimed
|
||||
},
|
||||
{
|
||||
Name: "preempt low-priority pod when high-priority pod has PodAntiAffinity conflict",
|
||||
PodGroups: []*schedulingv1beta1.PodGroup{
|
||||
util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg2", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"),
|
||||
util.BuildPodGroupWithPrio("pg3", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"),
|
||||
},
|
||||
Pods: []*v1.Pod{
|
||||
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg2", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
|
||||
buildPodWithPodAntiAffinity("c1", "preemptee2", "n1", v1.PodRunning, api.BuildResourceList("1", "1G"), "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true", "test": "test"}, make(map[string]string), "kubernetes.io/hostname"),
|
||||
|
||||
buildPodWithPodAntiAffinity("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg3", map[string]string{"test": "test"}, make(map[string]string), "kubernetes.io/hostname"),
|
||||
},
|
||||
Nodes: []*v1.Node{
|
||||
util.BuildNode("n1", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), map[string]string{"kubernetes.io/hostname": "n1"}),
|
||||
},
|
||||
Queues: []*schedulingv1beta1.Queue{
|
||||
util.BuildQueue("q1", 1, api.BuildResourceList("2", "2G")),
|
||||
},
|
||||
ExpectEvictNum: 1,
|
||||
ExpectEvicted: []string{"c1/preemptee2"},
|
||||
},
|
||||
}
|
||||
|
||||
trueValue := true
|
||||
tiers := []conf.Tier{
|
||||
{
|
||||
Plugins: []conf.PluginOption{
|
||||
{
|
||||
Name: conformance.PluginName,
|
||||
EnabledPreemptable: &trueValue,
|
||||
},
|
||||
{
|
||||
Name: gang.PluginName,
|
||||
EnabledPreemptable: &trueValue,
|
||||
EnabledJobPipelined: &trueValue,
|
||||
EnabledJobStarving: &trueValue,
|
||||
},
|
||||
{
|
||||
Name: priority.PluginName,
|
||||
EnabledTaskOrder: &trueValue,
|
||||
EnabledJobOrder: &trueValue,
|
||||
EnabledPreemptable: &trueValue,
|
||||
EnabledJobPipelined: &trueValue,
|
||||
EnabledJobStarving: &trueValue,
|
||||
},
|
||||
{
|
||||
Name: proportion.PluginName,
|
||||
EnabledOverused: &trueValue,
|
||||
EnabledAllocatable: &trueValue,
|
||||
EnabledQueueOrder: &trueValue,
|
||||
EnabledPredicate: &trueValue,
|
||||
},
|
||||
{
|
||||
Name: predicates.PluginName,
|
||||
EnabledPreemptable: &trueValue,
|
||||
EnabledPredicate: &trueValue,
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
||||
actions := []framework.Action{New()}
|
||||
for i, test := range tests {
|
||||
test.Plugins = plugins
|
||||
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[0].Name(),
|
||||
Arguments: map[string]interface{}{EnableTopologyAwarePreemptionKey: true}}})
|
||||
defer test.Close()
|
||||
test.Run(actions)
|
||||
if err := test.CheckAll(i); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func buildPodWithPodAntiAffinity(name, namespace, node string, phase v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string, topologyKey string) *v1.Pod {
|
||||
pod := util.BuildPod(name, namespace, node, phase, req, groupName, labels, selector)
|
||||
|
||||
pod.Spec.Affinity = &v1.Affinity{
|
||||
PodAntiAffinity: &v1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchLabels: labels,
|
||||
},
|
||||
TopologyKey: topologyKey,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return pod
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
|
|||
|
||||
assigned := false
|
||||
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
|
||||
totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task)
|
||||
totalNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(task)
|
||||
for _, n := range totalNodes {
|
||||
// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
|
||||
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422
|
||||
|
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
@ -165,10 +167,46 @@ type Status struct {
|
|||
}
|
||||
|
||||
// String represents status string
|
||||
func (s Status) String() string {
|
||||
func (s *Status) String() string {
|
||||
return s.Reason
|
||||
}
|
||||
|
||||
// IsSuccess returns true if and only if "Status" is nil or Code is "Success".
|
||||
func (s *Status) IsSuccess() bool {
|
||||
return s == nil || s.Code == Success
|
||||
}
|
||||
|
||||
// IsWait returns true if and only if "Status" is nil or Code is "Wait".
|
||||
func (s *Status) IsWait() bool {
|
||||
return s.Code == Wait
|
||||
}
|
||||
|
||||
// IsSkip returns true if and only if "Status" is nil or Code is "Skip".
|
||||
func (s *Status) IsSkip() bool {
|
||||
return s.Code == Skip
|
||||
}
|
||||
|
||||
// AsError returns nil if the status is a success, a wait or a skip; otherwise returns an "error" object
|
||||
// with a concatenated message on reasons of the Status.
|
||||
func (s *Status) AsError() error {
|
||||
if s.IsSuccess() || s.IsWait() || s.IsSkip() {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New(s.String())
|
||||
}
|
||||
|
||||
// AsStatus wraps an error in a Status.
|
||||
func AsStatus(err error) *Status {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return &Status{
|
||||
Code: Error,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
type StatusSets []*Status
|
||||
|
||||
func (s StatusSets) ContainsUnschedulable() bool {
|
||||
|
@ -320,3 +358,17 @@ type VictimTasksFn func([]*TaskInfo) []*TaskInfo
|
|||
|
||||
// AllocatableFn is the func declaration used to check whether the task can be allocated
|
||||
type AllocatableFn func(*QueueInfo, *TaskInfo) bool
|
||||
|
||||
// SimulateRemoveTaskFn is the func declaration used to simulate the result of removing a task from a node.
|
||||
type SimulateRemoveTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToRemove *TaskInfo, nodeInfo *NodeInfo) error
|
||||
|
||||
// SimulateAddTaskFn is the func declaration used to simulate the result of adding a task to a node.
|
||||
type SimulateAddTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToAdd *TaskInfo, nodeInfo *NodeInfo) error
|
||||
|
||||
// Simulate the predicate check for a task on a node.
|
||||
// Plugins implement this function to verify if the task can be scheduled to the node while maintaining topology constraints
|
||||
type SimulatePredicateFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) error
|
||||
|
||||
// Simulate the allocatable check for a node
|
||||
// Plugins implement this function to verify if the queue has enough resources to schedule the task while maintaining topology constraints
|
||||
type SimulateAllocatableFn func(ctx context.Context, state *k8sframework.CycleState, queue *QueueInfo, task *TaskInfo) bool
|
||||
|
|
|
@ -59,7 +59,6 @@ import (
|
|||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
|
||||
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
|
||||
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
|
||||
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
|
||||
"volcano.sh/apis/pkg/client/clientset/versioned/scheme"
|
||||
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
|
||||
|
@ -71,7 +70,6 @@ import (
|
|||
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
"volcano.sh/volcano/pkg/scheduler/metrics"
|
||||
"volcano.sh/volcano/pkg/scheduler/metrics/source"
|
||||
"volcano.sh/volcano/pkg/scheduler/util"
|
||||
commonutil "volcano.sh/volcano/pkg/util"
|
||||
)
|
||||
|
||||
|
@ -262,6 +260,19 @@ func (de *defaultEvictor) Evict(p *v1.Pod, reason string) error {
|
|||
klog.V(1).Infof("%+v", pod.Status.Conditions)
|
||||
return nil
|
||||
}
|
||||
|
||||
condition = &v1.PodCondition{
|
||||
Type: v1.DisruptionTarget,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: v1.PodReasonPreemptionByScheduler,
|
||||
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
|
||||
}
|
||||
if !podutil.UpdatePodCondition(&pod.Status, condition) {
|
||||
klog.V(1).Infof("UpdatePodCondition: existed condition, not update")
|
||||
klog.V(1).Infof("%+v", pod.Status.Conditions)
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := de.kubeclient.CoreV1().Pods(p.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
|
||||
klog.Errorf("Failed to update pod <%v/%v> status: %v", pod.Namespace, pod.Name, err)
|
||||
return err
|
||||
|
@ -345,9 +356,7 @@ func (su *defaultStatusUpdater) UpdateQueueStatus(queue *schedulingapi.QueueInfo
|
|||
return err
|
||||
}
|
||||
|
||||
queueStatusApply := v1beta1apply.QueueStatus().WithAllocated(newQueue.Status.Allocated)
|
||||
queueApply := v1beta1apply.Queue(newQueue.Name).WithStatus(queueStatusApply)
|
||||
_, err := su.vcclient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: util.DefaultComponentName})
|
||||
_, err := su.vcclient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("error occurred in updating Queue <%s>: %s", newQueue.Name, err.Error())
|
||||
return err
|
||||
|
|
|
@ -47,6 +47,9 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
ssn.InitCycleState()
|
||||
|
||||
return ssn
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ import (
|
|||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
|
||||
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
|
||||
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
|
||||
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
|
||||
"volcano.sh/volcano/pkg/scheduler/api"
|
||||
"volcano.sh/volcano/pkg/scheduler/cache"
|
||||
|
@ -110,17 +109,21 @@ type Session struct {
|
|||
overusedFns map[string]api.ValidateFn
|
||||
// preemptiveFns means whether current queue can reclaim from other queue,
|
||||
// while reclaimableFns means whether current queue's resources can be reclaimed.
|
||||
preemptiveFns map[string]api.ValidateWithCandidateFn
|
||||
allocatableFns map[string]api.AllocatableFn
|
||||
jobReadyFns map[string]api.ValidateFn
|
||||
jobPipelinedFns map[string]api.VoteFn
|
||||
jobValidFns map[string]api.ValidateExFn
|
||||
jobEnqueueableFns map[string]api.VoteFn
|
||||
jobEnqueuedFns map[string]api.JobEnqueuedFn
|
||||
targetJobFns map[string]api.TargetJobFn
|
||||
reservedNodesFns map[string]api.ReservedNodesFn
|
||||
victimTasksFns map[string][]api.VictimTasksFn
|
||||
jobStarvingFns map[string]api.ValidateFn
|
||||
preemptiveFns map[string]api.ValidateWithCandidateFn
|
||||
allocatableFns map[string]api.AllocatableFn
|
||||
jobReadyFns map[string]api.ValidateFn
|
||||
jobPipelinedFns map[string]api.VoteFn
|
||||
jobValidFns map[string]api.ValidateExFn
|
||||
jobEnqueueableFns map[string]api.VoteFn
|
||||
jobEnqueuedFns map[string]api.JobEnqueuedFn
|
||||
targetJobFns map[string]api.TargetJobFn
|
||||
reservedNodesFns map[string]api.ReservedNodesFn
|
||||
victimTasksFns map[string][]api.VictimTasksFn
|
||||
jobStarvingFns map[string]api.ValidateFn
|
||||
simulateRemoveTaskFns map[string]api.SimulateRemoveTaskFn
|
||||
simulateAddTaskFns map[string]api.SimulateAddTaskFn
|
||||
simulatePredicateFns map[string]api.SimulatePredicateFn
|
||||
simulateAllocatableFns map[string]api.SimulateAllocatableFn
|
||||
|
||||
// cycleStatesMap is used to temporarily store the scheduling status of each pod, its life cycle is same as Session.
|
||||
// Because state needs to be passed between different extension points (not only used in PreFilter and Filter),
|
||||
|
@ -152,34 +155,38 @@ func openSession(cache cache.Cache) *Session {
|
|||
RevocableNodes: map[string]*api.NodeInfo{},
|
||||
Queues: map[api.QueueID]*api.QueueInfo{},
|
||||
|
||||
plugins: map[string]Plugin{},
|
||||
jobOrderFns: map[string]api.CompareFn{},
|
||||
queueOrderFns: map[string]api.CompareFn{},
|
||||
victimQueueOrderFns: map[string]api.VictimCompareFn{},
|
||||
taskOrderFns: map[string]api.CompareFn{},
|
||||
clusterOrderFns: map[string]api.CompareFn{},
|
||||
predicateFns: map[string]api.PredicateFn{},
|
||||
prePredicateFns: map[string]api.PrePredicateFn{},
|
||||
bestNodeFns: map[string]api.BestNodeFn{},
|
||||
nodeOrderFns: map[string]api.NodeOrderFn{},
|
||||
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
|
||||
nodeMapFns: map[string]api.NodeMapFn{},
|
||||
nodeReduceFns: map[string]api.NodeReduceFn{},
|
||||
hyperNodeOrderFns: map[string]api.HyperNodeOrderFn{},
|
||||
preemptableFns: map[string]api.EvictableFn{},
|
||||
reclaimableFns: map[string]api.EvictableFn{},
|
||||
overusedFns: map[string]api.ValidateFn{},
|
||||
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
|
||||
allocatableFns: map[string]api.AllocatableFn{},
|
||||
jobReadyFns: map[string]api.ValidateFn{},
|
||||
jobPipelinedFns: map[string]api.VoteFn{},
|
||||
jobValidFns: map[string]api.ValidateExFn{},
|
||||
jobEnqueueableFns: map[string]api.VoteFn{},
|
||||
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
|
||||
targetJobFns: map[string]api.TargetJobFn{},
|
||||
reservedNodesFns: map[string]api.ReservedNodesFn{},
|
||||
victimTasksFns: map[string][]api.VictimTasksFn{},
|
||||
jobStarvingFns: map[string]api.ValidateFn{},
|
||||
plugins: map[string]Plugin{},
|
||||
jobOrderFns: map[string]api.CompareFn{},
|
||||
queueOrderFns: map[string]api.CompareFn{},
|
||||
victimQueueOrderFns: map[string]api.VictimCompareFn{},
|
||||
taskOrderFns: map[string]api.CompareFn{},
|
||||
clusterOrderFns: map[string]api.CompareFn{},
|
||||
predicateFns: map[string]api.PredicateFn{},
|
||||
prePredicateFns: map[string]api.PrePredicateFn{},
|
||||
bestNodeFns: map[string]api.BestNodeFn{},
|
||||
nodeOrderFns: map[string]api.NodeOrderFn{},
|
||||
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
|
||||
nodeMapFns: map[string]api.NodeMapFn{},
|
||||
nodeReduceFns: map[string]api.NodeReduceFn{},
|
||||
hyperNodeOrderFns: map[string]api.HyperNodeOrderFn{},
|
||||
preemptableFns: map[string]api.EvictableFn{},
|
||||
reclaimableFns: map[string]api.EvictableFn{},
|
||||
overusedFns: map[string]api.ValidateFn{},
|
||||
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
|
||||
allocatableFns: map[string]api.AllocatableFn{},
|
||||
jobReadyFns: map[string]api.ValidateFn{},
|
||||
jobPipelinedFns: map[string]api.VoteFn{},
|
||||
jobValidFns: map[string]api.ValidateExFn{},
|
||||
jobEnqueueableFns: map[string]api.VoteFn{},
|
||||
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
|
||||
targetJobFns: map[string]api.TargetJobFn{},
|
||||
reservedNodesFns: map[string]api.ReservedNodesFn{},
|
||||
victimTasksFns: map[string][]api.VictimTasksFn{},
|
||||
jobStarvingFns: map[string]api.ValidateFn{},
|
||||
simulateRemoveTaskFns: map[string]api.SimulateRemoveTaskFn{},
|
||||
simulateAddTaskFns: map[string]api.SimulateAddTaskFn{},
|
||||
simulatePredicateFns: map[string]api.SimulatePredicateFn{},
|
||||
simulateAllocatableFns: map[string]api.SimulateAllocatableFn{},
|
||||
}
|
||||
|
||||
snapshot := cache.Snapshot()
|
||||
|
@ -204,11 +211,14 @@ func openSession(cache cache.Cache) *Session {
|
|||
ssn.NamespaceInfo = snapshot.NamespaceInfo
|
||||
// calculate all nodes' resource only once in each schedule cycle, other plugins can clone it when need
|
||||
for _, n := range ssn.Nodes {
|
||||
if isNodeUnschedulable(n.Node) || isNodeNotReady(n.Node) {
|
||||
klog.V(3).Infof("node %s is not ready or unschedulable, need to continue", n.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
ssn.TotalResource.Add(n.Allocatable)
|
||||
}
|
||||
|
||||
ssn.InitCycleState()
|
||||
|
||||
klog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues",
|
||||
ssn.UID, len(ssn.Jobs), len(ssn.Queues))
|
||||
|
||||
|
@ -317,9 +327,8 @@ func updateRootQueueResources(ssn *Session, allocated v1.ResourceList) {
|
|||
}
|
||||
|
||||
if !equality.Semantic.DeepEqual(queue.Status.Allocated, allocated) {
|
||||
queueStatusApply := v1beta1apply.QueueStatus().WithAllocated(allocated)
|
||||
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
|
||||
_, err = ssn.VCClient().SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: util.DefaultComponentName})
|
||||
queue.Status.Allocated = allocated
|
||||
_, err = ssn.VCClient().SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), queue, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update root queue status: %s", err.Error())
|
||||
return
|
||||
|
@ -390,8 +399,8 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus {
|
|||
return status
|
||||
}
|
||||
|
||||
// GetUnschedulableAndUnresolvableNodesForTask filter out those node that has UnschedulableAndUnresolvable
|
||||
func (ssn *Session) GetUnschedulableAndUnresolvableNodesForTask(task *api.TaskInfo) []*api.NodeInfo {
|
||||
// FilterOutUnschedulableAndUnresolvableNodesForTask filter out those node that has UnschedulableAndUnresolvable
|
||||
func (ssn *Session) FilterOutUnschedulableAndUnresolvableNodesForTask(task *api.TaskInfo) []*api.NodeInfo {
|
||||
fitErrors, ok1 := ssn.Jobs[task.Job]
|
||||
if !ok1 {
|
||||
return ssn.NodeList
|
||||
|
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package framework
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
|
@ -160,6 +162,22 @@ func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) {
|
|||
ssn.jobStarvingFns[name] = fn
|
||||
}
|
||||
|
||||
func (ssn *Session) AddSimulateAddTaskFn(name string, fn api.SimulateAddTaskFn) {
|
||||
ssn.simulateAddTaskFns[name] = fn
|
||||
}
|
||||
|
||||
func (ssn *Session) AddSimulateRemoveTaskFn(name string, fn api.SimulateRemoveTaskFn) {
|
||||
ssn.simulateRemoveTaskFns[name] = fn
|
||||
}
|
||||
|
||||
func (ssn *Session) AddSimulateAllocatableFn(name string, fn api.SimulateAllocatableFn) {
|
||||
ssn.simulateAllocatableFns[name] = fn
|
||||
}
|
||||
|
||||
func (ssn *Session) AddSimulatePredicateFn(name string, fn api.SimulatePredicateFn) {
|
||||
ssn.simulatePredicateFns[name] = fn
|
||||
}
|
||||
|
||||
// Reclaimable invoke reclaimable function of the plugins
|
||||
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
|
||||
var victims []*api.TaskInfo
|
||||
|
@ -670,6 +688,85 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SimulateAllocatableFn invoke simulateAllocatableFn function of the plugins
|
||||
func (ssn *Session) SimulateAllocatableFn(ctx context.Context, state *k8sframework.CycleState, queue *api.QueueInfo, task *api.TaskInfo) bool {
|
||||
for _, tier := range ssn.Tiers {
|
||||
for _, plugin := range tier.Plugins {
|
||||
if !isEnabled(plugin.EnabledAllocatable) {
|
||||
continue
|
||||
}
|
||||
caf, found := ssn.simulateAllocatableFns[plugin.Name]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
if !caf(ctx, state, queue, task) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// SimulatePredicateFn invoke simulatePredicateFn function of the plugins
|
||||
func (ssn *Session) SimulatePredicateFn(ctx context.Context, state *k8sframework.CycleState, task *api.TaskInfo, node *api.NodeInfo) error {
|
||||
for _, tier := range ssn.Tiers {
|
||||
for _, plugin := range tier.Plugins {
|
||||
if !isEnabled(plugin.EnabledPredicate) {
|
||||
continue
|
||||
}
|
||||
pfn, found := ssn.simulatePredicateFns[plugin.Name]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
err := pfn(ctx, state, task, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SimulateRemoveTaskFn invoke simulateRemoveTaskFn function of the plugins
|
||||
func (ssn *Session) SimulateRemoveTaskFn(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
for _, tier := range ssn.Tiers {
|
||||
for _, plugin := range tier.Plugins {
|
||||
if !isEnabled(plugin.EnabledPreemptable) && !isEnabled(plugin.EnabledAllocatable) {
|
||||
continue
|
||||
}
|
||||
pfn, found := ssn.simulateRemoveTaskFns[plugin.Name]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
err := pfn(ctx, state, taskToSchedule, taskToRemove, nodeInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SimulateAddTaskFn invoke simulateAddTaskFn function of the plugins
|
||||
func (ssn *Session) SimulateAddTaskFn(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
for _, tier := range ssn.Tiers {
|
||||
for _, plugin := range tier.Plugins {
|
||||
if !isEnabled(plugin.EnabledPreemptable) && !isEnabled(plugin.EnabledAllocatable) {
|
||||
continue
|
||||
}
|
||||
pfn, found := ssn.simulateAddTaskFns[plugin.Name]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
err := pfn(ctx, state, taskToSchedule, taskToAdd, nodeInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PrePredicateFn invoke predicate function of the plugins
|
||||
func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error {
|
||||
for _, tier := range ssn.Tiers {
|
||||
|
|
|
@ -124,7 +124,7 @@ func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) {
|
|||
}
|
||||
|
||||
// check potential nodes
|
||||
potentialNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task)
|
||||
potentialNodes := ssn.FilterOutUnschedulableAndUnresolvableNodesForTask(task)
|
||||
want := test.want[task.UID]
|
||||
got := make([]string, 0, len(potentialNodes))
|
||||
for _, node := range potentialNodes {
|
||||
|
|
|
@ -262,3 +262,22 @@ func (nl *NodeLister) List() ([]*v1.Node, error) {
|
|||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func isNodeUnschedulable(node *v1.Node) bool {
|
||||
if node == nil {
|
||||
return true
|
||||
}
|
||||
return node.Spec.Unschedulable
|
||||
}
|
||||
|
||||
func isNodeNotReady(node *v1.Node) bool {
|
||||
if node == nil {
|
||||
return true
|
||||
}
|
||||
for _, cond := range node.Status.Conditions {
|
||||
if cond.Type == v1.NodeReady {
|
||||
return cond.Status != v1.ConditionTrue
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
package framework
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func makeNode(unschedulable bool, readyCondition v1.ConditionStatus, nodeIsNil bool) *v1.Node {
|
||||
if nodeIsNil {
|
||||
return nil
|
||||
}
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Unschedulable: unschedulable,
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: readyCondition,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return node
|
||||
}
|
||||
|
||||
func makeNodeWithoutReady() *v1.Node {
|
||||
return &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node-no-ready",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Unschedulable: false,
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsNodeUnschedulable(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input bool
|
||||
expected bool
|
||||
nodeIsNil bool
|
||||
}{
|
||||
{"unschedulable true", true, true, false},
|
||||
{"unschedulable false", false, false, false},
|
||||
{"node is nil", true, true, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
node := makeNode(tt.input, v1.ConditionTrue, tt.nodeIsNil)
|
||||
result := isNodeUnschedulable(node)
|
||||
if result != tt.expected {
|
||||
t.Errorf("expected %v, got %v", tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsNodeNotReady(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ready v1.ConditionStatus
|
||||
expected bool
|
||||
nodeIsNil bool
|
||||
}{
|
||||
{"NodeReady=True", v1.ConditionTrue, false, false},
|
||||
{"NodeReady=False", v1.ConditionFalse, true, false},
|
||||
{"NodeReady=Unknown", v1.ConditionUnknown, true, false},
|
||||
{"node is nil", "", true, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
node := makeNode(false, tt.ready, false)
|
||||
result := isNodeNotReady(node)
|
||||
if result != tt.expected {
|
||||
t.Errorf("expected %v, got %v", tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("No NodeReady condition", func(t *testing.T) {
|
||||
node := makeNodeWithoutReady()
|
||||
result := isNodeNotReady(node)
|
||||
if result != true {
|
||||
t.Errorf("expected true when no NodeReady condition, got %v", result)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -17,11 +17,13 @@ limitations under the License.
|
|||
package capacity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
|
||||
|
@ -33,8 +35,12 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
PluginName = "capacity"
|
||||
rootQueueID = "root"
|
||||
PluginName = "capacity"
|
||||
|
||||
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering.
|
||||
// Using the name of the plugin will likely help us avoid collisions with other plugins.
|
||||
capacityStateKey = PluginName
|
||||
rootQueueID = "root"
|
||||
)
|
||||
|
||||
type capacityPlugin struct {
|
||||
|
@ -231,6 +237,96 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
return util.Permit
|
||||
})
|
||||
|
||||
ssn.AddPrePredicateFn(cp.Name(), func(task *api.TaskInfo) error {
|
||||
state := &capacityState{
|
||||
queueAttrs: make(map[api.QueueID]*queueAttr),
|
||||
}
|
||||
|
||||
for _, queue := range cp.queueOpts {
|
||||
state.queueAttrs[queue.queueID] = queue.Clone()
|
||||
}
|
||||
|
||||
ssn.GetCycleState(task.UID).Write(capacityStateKey, state)
|
||||
return nil
|
||||
})
|
||||
|
||||
ssn.AddSimulateAddTaskFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
state, err := getCapacityState(cycleState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get capacity state: %w", err)
|
||||
}
|
||||
|
||||
job := ssn.Jobs[taskToAdd.Job]
|
||||
attr := state.queueAttrs[job.Queue]
|
||||
if attr == nil {
|
||||
return fmt.Errorf("queue %s not found", job.Queue)
|
||||
}
|
||||
attr.allocated.Add(taskToAdd.Resreq)
|
||||
updateQueueAttrShare(attr)
|
||||
if hierarchyEnabled {
|
||||
for _, ancestorID := range attr.ancestors {
|
||||
ancestorAttr := state.queueAttrs[ancestorID]
|
||||
ancestorAttr.allocated.Add(taskToAdd.Resreq)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
ssn.AddSimulateRemoveTaskFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
state, err := getCapacityState(cycleState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get capacity state: %w", err)
|
||||
}
|
||||
job := ssn.Jobs[taskToRemove.Job]
|
||||
attr := state.queueAttrs[job.Queue]
|
||||
if attr == nil {
|
||||
return fmt.Errorf("queue %s not found", job.Queue)
|
||||
}
|
||||
attr.allocated.Sub(taskToRemove.Resreq)
|
||||
updateQueueAttrShare(attr)
|
||||
if hierarchyEnabled {
|
||||
for _, ancestorID := range attr.ancestors {
|
||||
ancestorAttr := state.queueAttrs[ancestorID]
|
||||
ancestorAttr.allocated.Sub(taskToRemove.Resreq)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
ssn.AddSimulateAllocatableFn(cp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
|
||||
state, err := getCapacityState(cycleState)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !readyToSchedule {
|
||||
klog.V(3).Infof("Capacity plugin failed to check queue's hierarchical structure!")
|
||||
return false
|
||||
}
|
||||
if hierarchyEnabled && !cp.isLeafQueue(queue.UID) {
|
||||
klog.V(3).Infof("Queue <%s> is not a leaf queue, can not allocate task <%s>.", queue.Name, candidate.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
simulateQueueAllocatable := func(state *capacityState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
|
||||
attr := state.queueAttrs[queue.UID]
|
||||
return queueAllocatable(attr, candidate, queue)
|
||||
}
|
||||
|
||||
list := append(state.queueAttrs[queue.UID].ancestors, queue.UID)
|
||||
for i := len(list) - 1; i >= 0; i-- {
|
||||
if !simulateQueueAllocatable(state, ssn.Queues[list[i]], candidate) {
|
||||
if klog.V(5).Enabled() {
|
||||
for i--; i >= 0; i-- {
|
||||
simulateQueueAllocatable(state, ssn.Queues[list[i]], candidate)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Register event handlers.
|
||||
ssn.AddEventHandler(&framework.EventHandler{
|
||||
AllocateFunc: func(event *framework.Event) {
|
||||
|
@ -315,6 +411,7 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
|
|||
}
|
||||
realCapability := api.ExceededPart(cp.totalResource, cp.totalGuarantee).Add(attr.guarantee)
|
||||
if attr.capability == nil {
|
||||
attr.capability = api.EmptyResource()
|
||||
attr.realCapability = realCapability
|
||||
} else {
|
||||
realCapability.MinDimensionResource(attr.capability, api.Infinity)
|
||||
|
@ -683,6 +780,7 @@ func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error {
|
|||
for _, childAttr := range attr.children {
|
||||
realCapability := api.ExceededPart(attr.realCapability, totalGuarantee).Add(childAttr.guarantee)
|
||||
if childAttr.capability == nil {
|
||||
childAttr.capability = api.EmptyResource()
|
||||
childAttr.realCapability = realCapability
|
||||
} else {
|
||||
realCapability.MinDimensionResource(childAttr.capability, api.Infinity)
|
||||
|
@ -716,13 +814,7 @@ func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error {
|
|||
}
|
||||
|
||||
func (cp *capacityPlugin) updateShare(attr *queueAttr) {
|
||||
res := float64(0)
|
||||
|
||||
for _, rn := range attr.deserved.ResourceNames() {
|
||||
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)))
|
||||
}
|
||||
|
||||
attr.share = res
|
||||
updateQueueAttrShare(attr)
|
||||
metrics.UpdateQueueShare(attr.name, attr.share)
|
||||
}
|
||||
|
||||
|
@ -732,6 +824,10 @@ func (cp *capacityPlugin) isLeafQueue(queueID api.QueueID) bool {
|
|||
|
||||
func (cp *capacityPlugin) queueAllocatable(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
|
||||
attr := cp.queueOpts[queue.UID]
|
||||
return queueAllocatable(attr, candidate, queue)
|
||||
}
|
||||
|
||||
func queueAllocatable(attr *queueAttr, candidate *api.TaskInfo, queue *api.QueueInfo) bool {
|
||||
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
|
||||
allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq)
|
||||
if !allocatable {
|
||||
|
@ -805,3 +901,79 @@ func getQueueLevel(l *queueAttr, r *queueAttr) int {
|
|||
|
||||
return level
|
||||
}
|
||||
|
||||
func getCapacityState(cycleState *k8sframework.CycleState) (*capacityState, error) {
|
||||
c, err := cycleState.Read(capacityStateKey)
|
||||
if err != nil {
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %w", capacityStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(*capacityState)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%+v convert to capacity.state error", c)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type capacityState struct {
|
||||
queueAttrs map[api.QueueID]*queueAttr
|
||||
}
|
||||
|
||||
func (qa *queueAttr) Clone() *queueAttr {
|
||||
if qa == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cloned := &queueAttr{
|
||||
queueID: qa.queueID,
|
||||
name: qa.name,
|
||||
share: qa.share,
|
||||
deserved: qa.deserved.Clone(),
|
||||
allocated: qa.allocated.Clone(),
|
||||
request: qa.request.Clone(),
|
||||
elastic: qa.elastic.Clone(),
|
||||
inqueue: qa.inqueue.Clone(),
|
||||
capability: qa.capability.Clone(),
|
||||
realCapability: qa.realCapability.Clone(),
|
||||
guarantee: qa.guarantee.Clone(),
|
||||
children: make(map[api.QueueID]*queueAttr),
|
||||
}
|
||||
|
||||
if len(qa.ancestors) > 0 {
|
||||
cloned.ancestors = make([]api.QueueID, len(qa.ancestors))
|
||||
copy(cloned.ancestors, qa.ancestors)
|
||||
}
|
||||
|
||||
for childID, childNode := range qa.children {
|
||||
cloned.children[childID] = childNode.Clone()
|
||||
}
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (s *capacityState) Clone() k8sframework.StateData {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
newState := &capacityState{
|
||||
queueAttrs: make(map[api.QueueID]*queueAttr, len(s.queueAttrs)),
|
||||
}
|
||||
|
||||
for qID, qa := range s.queueAttrs {
|
||||
newState.queueAttrs[qID] = qa.Clone()
|
||||
}
|
||||
|
||||
return newState
|
||||
}
|
||||
|
||||
func updateQueueAttrShare(attr *queueAttr) {
|
||||
res := float64(0)
|
||||
|
||||
for _, rn := range attr.deserved.ResourceNames() {
|
||||
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)))
|
||||
}
|
||||
|
||||
attr.share = res
|
||||
}
|
||||
|
|
|
@ -71,3 +71,11 @@ type JobReadyRequest struct {
|
|||
type JobReadyResponse struct {
|
||||
Status bool `json:"status"`
|
||||
}
|
||||
|
||||
type EventHandlerRequest struct {
|
||||
Task *api.TaskInfo `json:"task"`
|
||||
}
|
||||
|
||||
type EventHandlerResponse struct {
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
}
|
||||
|
|
|
@ -58,6 +58,10 @@ const (
|
|||
ExtenderJobEnqueueableVerb = "extender.jobEnqueueableVerb"
|
||||
// ExtenderJobReadyVerb is the verb of JobReady method
|
||||
ExtenderJobReadyVerb = "extender.jobReadyVerb"
|
||||
// ExtenderAllocateFuncVerb is the verb of AllocateFunc method
|
||||
ExtenderAllocateFuncVerb = "extender.allocateFuncVerb"
|
||||
// ExtenderDeallocateFuncVerb is the verb of DeallocateFunc method
|
||||
ExtenderDeallocateFuncVerb = "extender.deallocateFuncVerb"
|
||||
// ExtenderIgnorable indicates whether the extender can ignore unexpected errors
|
||||
ExtenderIgnorable = "extender.ignorable"
|
||||
|
||||
|
@ -77,6 +81,8 @@ type extenderConfig struct {
|
|||
queueOverusedVerb string
|
||||
jobEnqueueableVerb string
|
||||
jobReadyVerb string
|
||||
allocateFuncVerb string
|
||||
deallocateFuncVerb string
|
||||
ignorable bool
|
||||
}
|
||||
|
||||
|
@ -123,6 +129,8 @@ func parseExtenderConfig(arguments framework.Arguments) *extenderConfig {
|
|||
ec.queueOverusedVerb, _ = arguments[ExtenderQueueOverusedVerb].(string)
|
||||
ec.jobEnqueueableVerb, _ = arguments[ExtenderJobEnqueueableVerb].(string)
|
||||
ec.jobReadyVerb, _ = arguments[ExtenderJobReadyVerb].(string)
|
||||
ec.allocateFuncVerb, _ = arguments[ExtenderAllocateFuncVerb].(string)
|
||||
ec.deallocateFuncVerb, _ = arguments[ExtenderDeallocateFuncVerb].(string)
|
||||
|
||||
arguments.GetBool(&ec.ignorable, ExtenderIgnorable)
|
||||
|
||||
|
@ -289,6 +297,8 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
return resp.Status
|
||||
})
|
||||
}
|
||||
|
||||
addEventHandler(ssn, ep)
|
||||
}
|
||||
|
||||
func (ep *extenderPlugin) OnSessionClose(ssn *framework.Session) {
|
||||
|
@ -330,3 +340,48 @@ func (ep *extenderPlugin) send(action string, args interface{}, result interface
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func addEventHandler(ssn *framework.Session, ep *extenderPlugin) {
|
||||
const (
|
||||
AllocateFunc = "AllocateFunc"
|
||||
DeallocateFunc = "DeallocateFunc"
|
||||
)
|
||||
eventHandlerFunc := func(funcName string) func(event *framework.Event) {
|
||||
return func(event *framework.Event) {
|
||||
if event == nil {
|
||||
klog.Errorf("%s event nil.", funcName)
|
||||
return
|
||||
}
|
||||
resp := &EventHandlerResponse{}
|
||||
var verb string
|
||||
switch funcName {
|
||||
case AllocateFunc:
|
||||
verb = ep.config.allocateFuncVerb
|
||||
case DeallocateFunc:
|
||||
verb = ep.config.deallocateFuncVerb
|
||||
}
|
||||
err := ep.send(verb, &EventHandlerRequest{Task: event.Task}, resp)
|
||||
if err != nil {
|
||||
klog.Warningf("%s failed with error %v", funcName, err)
|
||||
|
||||
if !ep.config.ignorable {
|
||||
event.Err = err
|
||||
}
|
||||
}
|
||||
if resp.ErrorMessage != "" {
|
||||
event.Err = errors.New(resp.ErrorMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var eventHandler framework.EventHandler
|
||||
if ep.config.allocateFuncVerb != "" {
|
||||
eventHandler.AllocateFunc = eventHandlerFunc(AllocateFunc)
|
||||
}
|
||||
|
||||
if ep.config.deallocateFuncVerb != "" {
|
||||
eventHandler.DeallocateFunc = eventHandlerFunc(DeallocateFunc)
|
||||
}
|
||||
|
||||
ssn.AddEventHandler(&eventHandler)
|
||||
}
|
||||
|
|
|
@ -434,6 +434,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
|
||||
// the processing logic needs to be added to the return value result.
|
||||
if predicate.podAffinityEnable {
|
||||
klog.Infof("Executing podAffinityFilter PreFilter for task %s/%s", task.Namespace, task.Name)
|
||||
_, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
|
||||
if err := handleSkipPrePredicatePlugin(status, state, task, interpodaffinity.Name); err != nil {
|
||||
return err
|
||||
|
@ -590,12 +591,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
|
||||
podAffinityStatus := api.ConvertPredicateStatus(status)
|
||||
if podAffinityStatus.Code != api.Success {
|
||||
// TODO: Currently, preemption is not supported when Pod affinity filtering fails.
|
||||
// Once supported, the logic here should be removed.
|
||||
// See https://github.com/volcano-sh/volcano/issues/3845
|
||||
podAffinityStatus.Code = api.UnschedulableAndUnresolvable
|
||||
predicateStatus = append(predicateStatus, podAffinityStatus)
|
||||
return api.NewFitErrWithStatus(task, node, predicateStatus...)
|
||||
if ShouldAbort(podAffinityStatus) {
|
||||
return api.NewFitErrWithStatus(task, node, predicateStatus...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -711,6 +710,71 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
})
|
||||
|
||||
ssn.RegisterBinder(pp.Name(), pp)
|
||||
|
||||
// Add SimulateAddTask function
|
||||
ssn.AddSimulateAddTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
podInfoToAdd, err := k8sframework.NewPodInfo(taskToAdd.Pod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create pod info: %w", err)
|
||||
}
|
||||
|
||||
k8sNodeInfo := k8sframework.NewNodeInfo(nodeInfo.Pods()...)
|
||||
k8sNodeInfo.SetNode(nodeInfo.Node)
|
||||
|
||||
if predicate.podAffinityEnable {
|
||||
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
|
||||
if !isSkipInterPodAffinity {
|
||||
status := podAffinityFilter.AddPod(ctx, cycleState, taskToSchedule.Pod, podInfoToAdd, k8sNodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return fmt.Errorf("failed to remove pod from node %s: %w", nodeInfo.Name, status.AsError())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// Add SimulateRemoveTask function
|
||||
ssn.AddSimulateRemoveTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
podInfoToRemove, err := k8sframework.NewPodInfo(taskToRemove.Pod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create pod info: %w", err)
|
||||
}
|
||||
|
||||
k8sNodeInfo := k8sframework.NewNodeInfo(nodeInfo.Pods()...)
|
||||
k8sNodeInfo.SetNode(nodeInfo.Node)
|
||||
|
||||
if predicate.podAffinityEnable {
|
||||
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
|
||||
if !isSkipInterPodAffinity {
|
||||
status := podAffinityFilter.RemovePod(ctx, cycleState, taskToSchedule.Pod, podInfoToRemove, k8sNodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return fmt.Errorf("failed to remove pod from node %s: %w", nodeInfo.Name, status.AsError())
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Add SimulatePredicate function
|
||||
ssn.AddSimulatePredicateFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, task *api.TaskInfo, node *api.NodeInfo) error {
|
||||
k8sNodeInfo := k8sframework.NewNodeInfo(node.Pods()...)
|
||||
k8sNodeInfo.SetNode(node.Node)
|
||||
|
||||
if predicate.podAffinityEnable {
|
||||
isSkipInterPodAffinity := handleSkipPredicatePlugin(cycleState, podAffinityFilter.Name())
|
||||
if !isSkipInterPodAffinity {
|
||||
status := podAffinityFilter.Filter(ctx, cycleState, task.Pod, k8sNodeInfo)
|
||||
|
||||
if !status.IsSuccess() {
|
||||
return fmt.Errorf("failed to filter pod on node %s: %w", node.Name, status.AsError())
|
||||
} else {
|
||||
klog.Infof("pod affinity for task %s/%s filter success on node %s", task.Namespace, task.Name, node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (pp *predicatesPlugin) runReservePlugins(ssn *framework.Session, event *framework.Event) {
|
||||
|
|
|
@ -253,7 +253,8 @@ func TestPodAntiAffinity(t *testing.T) {
|
|||
}
|
||||
test.PriClass = []*schedulingv1.PriorityClass{highPrio, lowPrio}
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
test.RegisterSession(tiers, nil)
|
||||
test.RegisterSession(tiers, []conf.Configuration{{Name: actions[1].Name(),
|
||||
Arguments: map[string]interface{}{preempt.EnableTopologyAwarePreemptionKey: true}}})
|
||||
defer test.Close()
|
||||
test.Run(actions)
|
||||
if err := test.CheckAll(i); err != nil {
|
||||
|
|
|
@ -17,11 +17,14 @@ limitations under the License.
|
|||
package proportion
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/klog/v2"
|
||||
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
|
||||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
"volcano.sh/volcano/pkg/scheduler/api"
|
||||
|
@ -32,7 +35,10 @@ import (
|
|||
)
|
||||
|
||||
// PluginName indicates name of volcano scheduler plugin.
|
||||
const PluginName = "proportion"
|
||||
const (
|
||||
PluginName = "proportion"
|
||||
proportionStateKey = "proportionState"
|
||||
)
|
||||
|
||||
type proportionPlugin struct {
|
||||
totalResource *api.Resource
|
||||
|
@ -119,6 +125,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
}
|
||||
realCapability := api.ExceededPart(pp.totalResource, pp.totalGuarantee).Add(attr.guarantee)
|
||||
if attr.capability == nil {
|
||||
attr.capability = api.EmptyResource()
|
||||
attr.realCapability = realCapability
|
||||
} else {
|
||||
realCapability.MinDimensionResource(attr.capability, api.Infinity)
|
||||
|
@ -318,12 +325,49 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
ssn.AddAllocatableFn(pp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
|
||||
return queueAllocatable(queue, candidate)
|
||||
})
|
||||
|
||||
ssn.AddSimulateAllocatableFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, queue *api.QueueInfo, candidate *api.TaskInfo) bool {
|
||||
state, err := getProportionState(cycleState)
|
||||
if err != nil {
|
||||
klog.Errorf("getProportionState error: %v", err)
|
||||
return false
|
||||
}
|
||||
attr := state.queueAttrs[queue.UID]
|
||||
if attr == nil {
|
||||
klog.Errorf("queue %v not found", queue.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
|
||||
allocatable := futureUsed.LessEqualWithDimension(attr.deserved, candidate.Resreq)
|
||||
if !allocatable {
|
||||
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>; Candidate <%v>: resource request <%v>",
|
||||
queue.Name, attr.deserved, attr.allocated, candidate.Name, candidate.Resreq)
|
||||
}
|
||||
|
||||
return allocatable
|
||||
})
|
||||
|
||||
ssn.AddPreemptiveFn(pp.Name(), func(obj interface{}, candidate interface{}) bool {
|
||||
queue := obj.(*api.QueueInfo)
|
||||
task := candidate.(*api.TaskInfo)
|
||||
return queueAllocatable(queue, task)
|
||||
})
|
||||
|
||||
ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error {
|
||||
state := &proportionState{
|
||||
queueAttrs: make(map[api.QueueID]*queueAttr),
|
||||
}
|
||||
|
||||
for _, queue := range pp.queueOpts {
|
||||
state.queueAttrs[queue.queueID] = queue.Clone()
|
||||
}
|
||||
|
||||
ssn.GetCycleState(task.UID).Write(proportionStateKey, state)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int {
|
||||
job := obj.(*api.JobInfo)
|
||||
queueID := job.Queue
|
||||
|
@ -364,6 +408,38 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
|
|||
return util.Reject
|
||||
})
|
||||
|
||||
ssn.AddSimulateAddTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToAdd *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
state, err := getProportionState(cycleState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get capacity state: %w", err)
|
||||
}
|
||||
|
||||
job := ssn.Jobs[taskToAdd.Job]
|
||||
attr := state.queueAttrs[job.Queue]
|
||||
if attr == nil {
|
||||
return fmt.Errorf("queue %s not found", job.Queue)
|
||||
}
|
||||
attr.allocated.Add(taskToAdd.Resreq)
|
||||
updateQueueAttrShare(attr)
|
||||
return nil
|
||||
})
|
||||
|
||||
ssn.AddSimulateRemoveTaskFn(pp.Name(), func(ctx context.Context, cycleState *k8sframework.CycleState, taskToSchedule *api.TaskInfo, taskToRemove *api.TaskInfo, nodeInfo *api.NodeInfo) error {
|
||||
state, err := getProportionState(cycleState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get capacity state: %w", err)
|
||||
}
|
||||
|
||||
job := ssn.Jobs[taskToRemove.Job]
|
||||
attr := state.queueAttrs[job.Queue]
|
||||
if attr == nil {
|
||||
return fmt.Errorf("queue %s not found", job.Queue)
|
||||
}
|
||||
attr.allocated.Sub(taskToRemove.Resreq)
|
||||
updateQueueAttrShare(attr)
|
||||
return nil
|
||||
})
|
||||
|
||||
// Register event handlers.
|
||||
ssn.AddEventHandler(&framework.EventHandler{
|
||||
AllocateFunc: func(event *framework.Event) {
|
||||
|
@ -398,6 +474,69 @@ func (pp *proportionPlugin) OnSessionClose(ssn *framework.Session) {
|
|||
}
|
||||
|
||||
func (pp *proportionPlugin) updateShare(attr *queueAttr) {
|
||||
updateQueueAttrShare(attr)
|
||||
metrics.UpdateQueueShare(attr.name, attr.share)
|
||||
}
|
||||
|
||||
type proportionState struct {
|
||||
queueAttrs map[api.QueueID]*queueAttr
|
||||
}
|
||||
|
||||
func (qa *queueAttr) Clone() *queueAttr {
|
||||
if qa == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clone basic attributes
|
||||
return &queueAttr{
|
||||
queueID: qa.queueID,
|
||||
name: qa.name,
|
||||
weight: qa.weight,
|
||||
share: qa.share,
|
||||
|
||||
deserved: qa.deserved.Clone(),
|
||||
allocated: qa.allocated.Clone(),
|
||||
request: qa.request.Clone(),
|
||||
elastic: qa.elastic.Clone(),
|
||||
inqueue: qa.inqueue.Clone(),
|
||||
capability: qa.capability.Clone(),
|
||||
realCapability: qa.realCapability.Clone(),
|
||||
guarantee: qa.guarantee.Clone(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *proportionState) Clone() k8sframework.StateData {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
newState := &proportionState{
|
||||
queueAttrs: make(map[api.QueueID]*queueAttr, len(s.queueAttrs)),
|
||||
}
|
||||
|
||||
// Clone all queue attributes
|
||||
for qID, qa := range s.queueAttrs {
|
||||
newState.queueAttrs[qID] = qa.Clone()
|
||||
}
|
||||
|
||||
return newState
|
||||
}
|
||||
|
||||
func getProportionState(cycleState *k8sframework.CycleState) (*proportionState, error) {
|
||||
c, err := cycleState.Read(proportionStateKey)
|
||||
if err != nil {
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %w", proportionStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(*proportionState)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%+v convert to proportion.state error", c)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func updateQueueAttrShare(attr *queueAttr) {
|
||||
res := float64(0)
|
||||
|
||||
// TODO(k82cn): how to handle fragment issues?
|
||||
|
@ -409,5 +548,4 @@ func (pp *proportionPlugin) updateShare(attr *queueAttr) {
|
|||
}
|
||||
|
||||
attr.share = res
|
||||
metrics.UpdateQueueShare(attr.name, attr.share)
|
||||
}
|
||||
|
|
|
@ -43,6 +43,9 @@ func BuildNode(name string, alloc v1.ResourceList, labels map[string]string) *v1
|
|||
Status: v1.NodeStatus{
|
||||
Capacity: alloc,
|
||||
Allocatable: alloc,
|
||||
Conditions: []v1.NodeCondition{
|
||||
{Type: v1.NodeReady, Status: v1.ConditionTrue},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
. "github.com/onsi/gomega"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
|
@ -456,4 +457,103 @@ var _ = Describe("Job E2E Test", func() {
|
|||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should preempt low priority pod with anti-affinity constraint", func() {
|
||||
// Remove enqueue action first because it conflicts with preempt.
|
||||
cmc := e2eutil.NewConfigMapCase("volcano-system", "integration-scheduler-configmap")
|
||||
modifier := func(sc *e2eutil.SchedulerConfiguration) bool {
|
||||
newActions := strings.TrimPrefix(sc.Actions, "enqueue, ")
|
||||
if newActions == sc.Actions {
|
||||
klog.Warning("There is already no enqueue action")
|
||||
return false
|
||||
}
|
||||
|
||||
sc.Configurations = append(sc.Configurations, e2eutil.Configuration{
|
||||
Name: "preempt",
|
||||
Arguments: map[string]string{
|
||||
"enableTopologyAwarePreemption": "true",
|
||||
},
|
||||
})
|
||||
sc.Actions = newActions
|
||||
return true
|
||||
}
|
||||
cmc.ChangeBy(func(data map[string]string) (changed bool, changedBefore map[string]string) {
|
||||
return e2eutil.ModifySchedulerConfig(data, modifier)
|
||||
})
|
||||
defer cmc.UndoChanged()
|
||||
|
||||
ctx = e2eutil.InitTestContext(e2eutil.Options{
|
||||
PriorityClasses: map[string]int32{
|
||||
highPriority: highPriorityValue,
|
||||
lowPriority: lowPriorityValue,
|
||||
},
|
||||
})
|
||||
|
||||
slot := e2eutil.OneCPU
|
||||
rep := e2eutil.ClusterSize(ctx, slot)
|
||||
|
||||
// Create low priority pod with specific label
|
||||
lowPriorityLabels := map[string]string{
|
||||
"app": "test-app",
|
||||
}
|
||||
job := &e2eutil.JobSpec{
|
||||
Tasks: []e2eutil.TaskSpec{
|
||||
{
|
||||
Img: e2eutil.DefaultNginxImage,
|
||||
Req: slot,
|
||||
Min: 1,
|
||||
Rep: rep,
|
||||
Labels: map[string]string{
|
||||
schedulingv1beta1.PodPreemptable: "true",
|
||||
"app": "test-app",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
job.Name = "low-priority-job"
|
||||
job.Pri = lowPriority
|
||||
lowPriorityJob := e2eutil.CreateJob(ctx, job)
|
||||
err := e2eutil.WaitTasksReady(ctx, lowPriorityJob, int(rep))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Create high priority pod with anti-affinity rule
|
||||
highPriorityJob := &e2eutil.JobSpec{
|
||||
Tasks: []e2eutil.TaskSpec{
|
||||
{
|
||||
Img: e2eutil.DefaultNginxImage,
|
||||
Req: slot,
|
||||
Min: rep / 2,
|
||||
Rep: rep,
|
||||
Labels: map[string]string{
|
||||
schedulingv1beta1.PodPreemptable: "true",
|
||||
},
|
||||
Affinity: &corev1.Affinity{
|
||||
PodAntiAffinity: &corev1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchLabels: lowPriorityLabels,
|
||||
},
|
||||
TopologyKey: "kubernetes.io/hostname",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
highPriorityJob.Name = "high-priority-job"
|
||||
highPriorityJob.Pri = highPriority
|
||||
highPriorityJobCreated := e2eutil.CreateJob(ctx, highPriorityJob)
|
||||
|
||||
// Verify high priority pod is successfully scheduled
|
||||
err = e2eutil.WaitTasksReady(ctx, highPriorityJobCreated, int(rep)/2)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Verify low priority pod is preempted
|
||||
err = e2eutil.WaitTasksReady(ctx, lowPriorityJob, int(rep)/2)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue