Compare commits
10 Commits
Author | SHA1 | Date |
---|---|---|
|
04d1afc71d | |
|
6434092d7c | |
|
0aed15c49f | |
|
3080740142 | |
|
f186d9c6de | |
|
46f6f7cba0 | |
|
15294dac0e | |
|
a6a4afb668 | |
|
fad3edc7f3 | |
|
31214b1d27 |
|
@ -15,7 +15,7 @@ You can modify the deployment method according to different environments.
|
|||
|
||||
## 1. Deploy the Karmada
|
||||
|
||||
Suggest `Karmada` Version: **master**
|
||||
Suggest `Karmada` Version: **v1.13.0-beta.0 or higher**
|
||||
|
||||
Follow the [karmada get started guide](https://karmada.io/docs/get-started/nginx-example) to deploy `Karmada`.
|
||||
|
||||
|
@ -31,7 +31,7 @@ cd karmada
|
|||
|
||||
## 2. Deploy the Volcano to member clusters
|
||||
|
||||
Suggest `Volcano` Version: **1.10.0**
|
||||
Suggest `Volcano` Version: **1.10.0 or higher**
|
||||
|
||||
Follow the [volcano installation guide](https://volcano.sh/en/docs/v1-9-0/installation/) to deploy `Volcano` to the member clusters.
|
||||
|
||||
|
@ -69,21 +69,6 @@ kubectl --context karmada-host annotate secret karmada-webhook-config \
|
|||
|
||||
## 4. Deploy the volcano-global controller and webhook manager at Karmada control plane cluster
|
||||
|
||||
You need to build the images on the root direction of the project.
|
||||
|
||||
```bash
|
||||
git clone https://github.com/volcano-sh/volcano-global.git
|
||||
|
||||
cd volcano-global
|
||||
|
||||
# Build the components.
|
||||
TAG=1.0 make images
|
||||
|
||||
# Load the image to karmada host cluster.
|
||||
kind load docker-image --name karmada-host volcanosh/volcano-global-controller-manager:1.0
|
||||
kind load docker-image --name karmada-host volcanosh/volcano-global-webhook-manager:1.0
|
||||
```
|
||||
|
||||
```bash
|
||||
# Switch to Karmada host kubeconfig.
|
||||
export KUBECONFIG=$HOME/.kube/karmada.config
|
||||
|
|
|
@ -16,7 +16,7 @@ spec:
|
|||
automountServiceAccountToken: false
|
||||
containers:
|
||||
- name: volcano-global-controllers-manager
|
||||
image: volcanosh/volcano-global-controller-manager:1.0
|
||||
image: volcanosh/volcano-global-controller-manager:latest
|
||||
args:
|
||||
- --kubeconfig=/etc/kubeconfig/karmada.config
|
||||
- --leader-elect=false
|
||||
|
@ -26,7 +26,7 @@ spec:
|
|||
- --dispatch-period=1s
|
||||
- -v=5
|
||||
- 2>&1
|
||||
imagePullPolicy: Never
|
||||
imagePullPolicy: IfNotPresent
|
||||
volumeMounts:
|
||||
- name: webhook-config
|
||||
mountPath: /etc/kubeconfig
|
||||
|
|
|
@ -31,8 +31,8 @@ spec:
|
|||
- --port=8443
|
||||
- -v=5
|
||||
- 2>&1
|
||||
image: volcanosh/volcano-global-webhook-manager:1.0
|
||||
imagePullPolicy: Never
|
||||
image: volcanosh/volcano-global-webhook-manager:latest
|
||||
imagePullPolicy: IfNotPresent
|
||||
volumeMounts:
|
||||
- mountPath: /admission.local.config/certificates
|
||||
name: admission-certs
|
||||
|
@ -78,8 +78,8 @@ spec:
|
|||
restartPolicy: Never
|
||||
containers:
|
||||
- name: main
|
||||
image: volcanosh/volcano-global-webhook-manager:1.0
|
||||
imagePullPolicy: Never
|
||||
image: volcanosh/volcano-global-webhook-manager:latest
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["./gen-admission-secret.sh", "--service", "volcano-global-webhook", "--namespace",
|
||||
"volcano-global", "--secret", "volcano-global-webhook-cert"]
|
||||
---
|
||||
|
|
12
go.mod
12
go.mod
|
@ -1,9 +1,9 @@
|
|||
module volcano.sh/volcano-global
|
||||
|
||||
go 1.22.9
|
||||
go 1.22.11
|
||||
|
||||
require (
|
||||
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712
|
||||
github.com/karmada-io/karmada v1.13.0-beta.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.9.0
|
||||
gomodules.xyz/jsonpatch/v2 v2.4.0
|
||||
|
@ -58,11 +58,11 @@ require (
|
|||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
github.com/spf13/cobra v1.8.1 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
golang.org/x/net v0.28.0 // indirect
|
||||
golang.org/x/net v0.34.0 // indirect
|
||||
golang.org/x/oauth2 v0.21.0 // indirect
|
||||
golang.org/x/sys v0.23.0 // indirect
|
||||
golang.org/x/term v0.23.0 // indirect
|
||||
golang.org/x/text v0.17.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/term v0.28.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
|
|
28
go.sum
28
go.sum
|
@ -95,8 +95,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
|
|||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712 h1:o3xSYSPVtififOUUfRJV96iWkKEhIZbf2d3+UAgmX+4=
|
||||
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712/go.mod h1:ccoLWknjS1hHmBuAlRBP6Y55kLGe39GpTSs9LAW+7Wc=
|
||||
github.com/karmada-io/karmada v1.13.0-beta.0 h1:1hdIYTC6SstBC3/85b6+CiCLufd73WvDTTupRA55Y2o=
|
||||
github.com/karmada-io/karmada v1.13.0-beta.0/go.mod h1:1z5VJPUWPwfjZJnXJiIBJFjlZc/BTuD+5z1jJbFfHBo=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
|
@ -201,8 +201,8 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
|
|||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
|
||||
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
|
||||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
|
@ -211,26 +211,26 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
|
||||
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
|
||||
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
|
||||
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
|
||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
|
||||
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
|
||||
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
|
||||
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
|
||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM golang:1.22.9 AS builder
|
||||
FROM golang:1.22.11 AS builder
|
||||
WORKDIR /go/src/volcano.sh/
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM golang:1.22.9 AS builder
|
||||
FROM golang:1.22.11 AS builder
|
||||
WORKDIR /go/src/volcano.sh/
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM golang:1.22.9 AS builder
|
||||
FROM golang:1.22.11 AS builder
|
||||
WORKDIR /go/src/volcano.sh/
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
|
|
@ -19,6 +19,7 @@ package api
|
|||
import (
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
)
|
||||
|
||||
type DispatchStatus int16
|
||||
|
@ -35,6 +36,7 @@ type ResourceBindingInfo struct {
|
|||
Queue string
|
||||
PriorityClassName string
|
||||
DispatchStatus DispatchStatus
|
||||
ResReq *volcanoapi.Resource
|
||||
|
||||
// Update it when snapshot.
|
||||
Priority int32
|
||||
|
@ -47,6 +49,7 @@ func (rbi *ResourceBindingInfo) DeepCopy() *ResourceBindingInfo {
|
|||
Queue: rbi.Queue,
|
||||
PriorityClassName: rbi.PriorityClassName,
|
||||
DispatchStatus: rbi.DispatchStatus,
|
||||
ResReq: rbi.ResReq.Clone(),
|
||||
|
||||
Priority: rbi.Priority,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
Copyright 2025 The Volcano Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
)
|
||||
|
||||
type AllocatableFn func(qi *volcanoapi.QueueInfo, rbi *ResourceBindingInfo) bool
|
|
@ -20,9 +20,11 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
|
||||
karmadainformerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
|
||||
informerclusterv1alpha1 "github.com/karmada-io/karmada/pkg/generated/informers/externalversions/cluster/v1alpha1"
|
||||
informerworkv1aplha2 "github.com/karmada-io/karmada/pkg/generated/informers/externalversions/work/v1alpha2"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
@ -83,6 +85,10 @@ type DispatcherCache struct {
|
|||
// resourceBindingInfos[namespace][name] = target ResourceBindingInfo.
|
||||
resourceBindingInfos map[string]map[string]*api.ResourceBindingInfo
|
||||
|
||||
clusterInformer informerclusterv1alpha1.ClusterInformer
|
||||
// clusters[name] = target Cluster
|
||||
clusters map[string]*clusterv1alpha1.Cluster
|
||||
|
||||
// Its queue for unsuspend the ResourceBinding, when a ResourceBinding finish dispatch,
|
||||
// The Dispatcher will add a task to here, and update the ResourceBinding.spec.Suspend = false.
|
||||
unSuspendRBTaskQueue workqueue.Interface
|
||||
|
@ -135,6 +141,8 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
|
|||
|
||||
resourceBindingInfos: map[string]map[string]*api.ResourceBindingInfo{},
|
||||
|
||||
clusters: map[string]*clusterv1alpha1.Cluster{},
|
||||
|
||||
unSuspendRBTaskQueue: workqueue.New(),
|
||||
}
|
||||
|
||||
|
@ -158,6 +166,12 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
|
|||
UpdateFunc: sc.updateResourceBinding,
|
||||
DeleteFunc: sc.deleteResourceBinding,
|
||||
})
|
||||
sc.clusterInformer = sc.karmadaInformerFactor.Cluster().V1alpha1().Clusters()
|
||||
sc.clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sc.addCluster,
|
||||
UpdateFunc: sc.updateCluster,
|
||||
DeleteFunc: sc.deleteCluster,
|
||||
})
|
||||
|
||||
return sc
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@ package cache
|
|||
import (
|
||||
"context"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
@ -27,8 +29,10 @@ import (
|
|||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
"volcano.sh/apis/pkg/apis/scheduling/scheme"
|
||||
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/api"
|
||||
"volcano.sh/volcano-global/pkg/utils"
|
||||
"volcano.sh/volcano-global/pkg/workload"
|
||||
)
|
||||
|
||||
|
@ -38,17 +42,17 @@ func (dc *DispatcherCache) addQueue(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
// Convert the queue from v1beta1 to v1
|
||||
v1queue := &scheduling.Queue{}
|
||||
if err := scheme.Scheme.Convert(queue, v1queue, nil); err != nil {
|
||||
klog.Errorf("Failed to convert queue from %T to %T", queue, v1queue)
|
||||
// Convert the queue from v1beta1 to internal
|
||||
internalQueue := &scheduling.Queue{}
|
||||
if err := scheme.Scheme.Convert(queue, internalQueue, nil); err != nil {
|
||||
klog.Errorf("Failed to convert queue from %T to %T", queue, internalQueue)
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
dc.queues[queue.Name] = schedulingapi.NewQueueInfo(v1queue)
|
||||
dc.setQueue(internalQueue)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) deleteQueue(obj interface{}) {
|
||||
|
@ -68,9 +72,25 @@ func (dc *DispatcherCache) updateQueue(oldObj, newObj interface{}) {
|
|||
if oldQueue == nil || newQueue == nil {
|
||||
return
|
||||
}
|
||||
if oldQueue.ResourceVersion == newQueue.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
dc.deleteQueue(oldQueue)
|
||||
dc.addQueue(newQueue)
|
||||
// Convert the queue from v1beta1 to internal
|
||||
internalQueue := &scheduling.Queue{}
|
||||
if err := scheme.Scheme.Convert(newQueue, internalQueue, nil); err != nil {
|
||||
klog.Errorf("Failed to convert queue from %T to %T", newQueue, internalQueue)
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
dc.setQueue(internalQueue)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) setQueue(queue *scheduling.Queue) {
|
||||
dc.queues[queue.Name] = schedulingapi.NewQueueInfo(queue)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) addPriorityClass(obj interface{}) {
|
||||
|
@ -81,12 +101,7 @@ func (dc *DispatcherCache) addPriorityClass(obj interface{}) {
|
|||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
if pc.GlobalDefault {
|
||||
klog.V(3).Infof("Set default PriorityClass to <%s>, Priority <%d>.", pc.Name, pc.Value)
|
||||
dc.defaultPriorityClass = pc
|
||||
}
|
||||
|
||||
dc.priorityClasses[pc.Name] = pc
|
||||
dc.setPriorityClass(pc)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) deletePriorityClass(obj interface{}) {
|
||||
|
@ -94,6 +109,7 @@ func (dc *DispatcherCache) deletePriorityClass(obj interface{}) {
|
|||
if pc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
|
@ -110,8 +126,27 @@ func (dc *DispatcherCache) updatePriorityClass(oldObj, newObj interface{}) {
|
|||
if oldPc == nil || newPc == nil {
|
||||
return
|
||||
}
|
||||
dc.deletePriorityClass(oldPc)
|
||||
dc.addPriorityClass(newPc)
|
||||
if oldPc.ResourceVersion == newPc.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
if oldPc.GlobalDefault {
|
||||
klog.V(5).Infof("Delete default PriorityClass <%s>, Priority <%d>.", oldPc.Name, oldPc.Value)
|
||||
dc.defaultPriorityClass = nil
|
||||
}
|
||||
|
||||
dc.setPriorityClass(newPc)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) setPriorityClass(pc *schedulingv1.PriorityClass) {
|
||||
if pc.GlobalDefault {
|
||||
klog.V(3).Infof("Set default PriorityClass to <%s>, Priority <%d>.", pc.Name, pc.Value)
|
||||
dc.defaultPriorityClass = pc
|
||||
}
|
||||
dc.priorityClasses[pc.Name] = pc
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
|
||||
|
@ -120,6 +155,48 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
dc.setResourceBinding(rb)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) deleteResourceBinding(obj interface{}) {
|
||||
rb := convertToResourceBinding(obj)
|
||||
if rb == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
if dc.resourceBindings[rb.Namespace] == nil {
|
||||
klog.Errorf("Failed to delete ResourceBinding <%s/%s>, the Resourcebinding's "+
|
||||
"Namespace is not in the cache.", rb.Namespace, rb.Name)
|
||||
return
|
||||
} else {
|
||||
delete(dc.resourceBindings[rb.Namespace], rb.Name)
|
||||
delete(dc.resourceBindingInfos[rb.Namespace], rb.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) updateResourceBinding(oldObj, newObj interface{}) {
|
||||
oldRb := convertToResourceBinding(oldObj)
|
||||
newRb := convertToResourceBinding(newObj)
|
||||
if oldRb == nil || newRb == nil {
|
||||
return
|
||||
}
|
||||
if oldRb.ResourceVersion == newRb.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
dc.setResourceBinding(newRb)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) setResourceBinding(rb *workv1alpha2.ResourceBinding) {
|
||||
// Check if its workload, skip add to cache if not.
|
||||
isWorkload, newWorkloadFunc, err := workload.TryGetNewWorkloadFunc(rb.Spec.Resource)
|
||||
if err != nil {
|
||||
|
@ -145,10 +222,6 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
// Add the ResourceBinding to cache.
|
||||
if dc.resourceBindings[rb.Namespace] == nil {
|
||||
dc.resourceBindings[rb.Namespace] = map[string]*workv1alpha2.ResourceBinding{
|
||||
rb.Name: rb,
|
||||
|
@ -165,6 +238,13 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
|
|||
PriorityClassName: workload.PriorityClassName(),
|
||||
DispatchStatus: api.UnSuspended,
|
||||
}
|
||||
|
||||
resReq := volcanoapi.EmptyResource()
|
||||
if rb.Spec.ReplicaRequirements != nil {
|
||||
resReq = volcanoapi.NewResource(rb.Spec.ReplicaRequirements.ResourceRequest).Multi(float64(rb.Spec.Replicas))
|
||||
}
|
||||
newResourceBindingInfo.ResReq = resReq
|
||||
|
||||
// Currently, our failurePolicy is set to Fail, which ensures that no unexpected ResourceBindings will exist.
|
||||
// When a ResourceBinding is created, it will definitely be updated to Suspend, so we don't need to check the Status,
|
||||
// so rb should be suspended normally.
|
||||
|
@ -181,33 +261,52 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) deleteResourceBinding(obj interface{}) {
|
||||
rb := convertToResourceBinding(obj)
|
||||
if rb == nil {
|
||||
func (dc *DispatcherCache) addCluster(obj interface{}) {
|
||||
cluster := convertToCluster(obj)
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
if ready, message := utils.CheckClusterReady(cluster); !ready {
|
||||
klog.V(5).Info(message)
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
if dc.resourceBindings[rb.Namespace] == nil {
|
||||
klog.Errorf("Failed to delete ResourceBinding <%s/%s>, the Resourcebinding's "+
|
||||
"Namespace is not in the cache.", rb.Namespace, rb.Name)
|
||||
return
|
||||
} else {
|
||||
delete(dc.resourceBindings[rb.Namespace], rb.Name)
|
||||
delete(dc.resourceBindingInfos[rb.Namespace], rb.Name)
|
||||
}
|
||||
dc.setCluster(cluster)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) updateResourceBinding(oldObj, newObj interface{}) {
|
||||
oldRb := convertToResourceBinding(oldObj)
|
||||
newRb := convertToResourceBinding(newObj)
|
||||
if oldRb == nil || newRb == nil {
|
||||
func (dc *DispatcherCache) deleteCluster(obj interface{}) {
|
||||
cluster := convertToCluster(obj)
|
||||
if cluster == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dc.deleteResourceBinding(oldRb)
|
||||
dc.addResourceBinding(newRb)
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
delete(dc.clusters, cluster.Name)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) updateCluster(oldObj, newObj interface{}) {
|
||||
oldCluster := convertToCluster(oldObj)
|
||||
newCluster := convertToCluster(newObj)
|
||||
if oldCluster == nil || newCluster == nil {
|
||||
return
|
||||
}
|
||||
if oldCluster.ResourceVersion == newCluster.ResourceVersion {
|
||||
return
|
||||
}
|
||||
|
||||
dc.mutex.Lock()
|
||||
defer dc.mutex.Unlock()
|
||||
|
||||
dc.setCluster(newCluster)
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) setCluster(cluster *clusterv1alpha1.Cluster) {
|
||||
dc.clusters[cluster.Name] = cluster
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) getResourceFromObjectReference(ref workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package cache
|
||||
|
||||
import (
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
schedulingv1 "k8s.io/api/scheduling/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -31,7 +32,7 @@ func convertToQueue(obj interface{}) *schedulingv1beta1.Queue {
|
|||
|
||||
queue, ok := obj.(*schedulingv1beta1.Queue)
|
||||
if !ok {
|
||||
klog.Errorf("Cant Convert obj to *schedulingv1beta1.Queue, obj: %v", obj)
|
||||
klog.Errorf("Can't Convert obj to *schedulingv1beta1.Queue, obj: %v", obj)
|
||||
return nil
|
||||
}
|
||||
return queue
|
||||
|
@ -44,7 +45,7 @@ func convertToPriorityClass(obj interface{}) *schedulingv1.PriorityClass {
|
|||
|
||||
priorityClass, ok := obj.(*schedulingv1.PriorityClass)
|
||||
if !ok {
|
||||
klog.Errorf("Cant Convert obj to *schedulingv1.PriorityClass, obj: %v", obj)
|
||||
klog.Errorf("Can't Convert obj to *schedulingv1.PriorityClass, obj: %v", obj)
|
||||
return nil
|
||||
}
|
||||
return priorityClass
|
||||
|
@ -57,8 +58,21 @@ func convertToResourceBinding(obj interface{}) *workv1alpha2.ResourceBinding {
|
|||
|
||||
resourceBinding, ok := obj.(*workv1alpha2.ResourceBinding)
|
||||
if !ok {
|
||||
klog.Errorf("Cant Convert obj to *workv1alpha2.ResourceBinding, obj: %v", obj)
|
||||
klog.Errorf("Can't Convert obj to *workv1alpha2.ResourceBinding, obj: %v", obj)
|
||||
return nil
|
||||
}
|
||||
return resourceBinding
|
||||
}
|
||||
|
||||
func convertToCluster(obj interface{}) *clusterv1alpha1.Cluster {
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
||||
cluster, ok := obj.(*clusterv1alpha1.Cluster)
|
||||
if !ok {
|
||||
klog.Errorf("Can't Convert obj to *clusterv1alpha1.Cluster, obj: %v", obj)
|
||||
return nil
|
||||
}
|
||||
return cluster
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ type DispatcherCacheSnapshot struct {
|
|||
QueueInfos map[string]*schedulingapi.QueueInfo
|
||||
|
||||
ResourceBindingInfos map[types.UID]*api.ResourceBindingInfo
|
||||
|
||||
TotalResource *schedulingapi.Resource
|
||||
}
|
||||
|
||||
func (dc *DispatcherCache) Snapshot() *DispatcherCacheSnapshot {
|
||||
|
@ -40,12 +42,17 @@ func (dc *DispatcherCache) Snapshot() *DispatcherCacheSnapshot {
|
|||
DefaultQueue: dc.defaultQueue,
|
||||
QueueInfos: make(map[string]*schedulingapi.QueueInfo, len(dc.queues)),
|
||||
ResourceBindingInfos: make(map[types.UID]*api.ResourceBindingInfo),
|
||||
TotalResource: schedulingapi.EmptyResource(),
|
||||
}
|
||||
|
||||
for _, queue := range dc.queues {
|
||||
snapshot.QueueInfos[queue.Name] = queue.Clone()
|
||||
}
|
||||
|
||||
for _, cluster := range dc.clusters {
|
||||
snapshot.TotalResource.Add(schedulingapi.NewResource(cluster.Status.ResourceSummary.Allocatable))
|
||||
}
|
||||
|
||||
// Collect the ResourceBindingInfos.
|
||||
// First, we should update the elements of the ResourceBindingInfo,
|
||||
// because we only set some elements of them when creating the ResourceBindingInfo.
|
||||
|
|
|
@ -169,13 +169,25 @@ func (dispatcher *Dispatcher) dispatch(ssn *dispatcherframework.Session) {
|
|||
|
||||
// Get the highest priority ResourceBinding from the queue.
|
||||
rbi := resourceBindings.Pop().(*api.ResourceBindingInfo)
|
||||
|
||||
rbi.DispatchStatus = api.UnSuspending
|
||||
dispatcher.cache.UnSuspendResourceBinding(types.NamespacedName{
|
||||
Namespace: rbi.ResourceBinding.Namespace,
|
||||
Name: rbi.ResourceBinding.Name,
|
||||
})
|
||||
dispatchResourceBindingCount++
|
||||
if ssn.Allocatable(queue, rbi) {
|
||||
if err := ssn.Allocate(rbi); err != nil {
|
||||
klog.Errorf("Failed to allocate ResourceBindingInfo <%s/%s>, err: %v",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, err)
|
||||
if err = ssn.UnAllocate(rbi); err != nil {
|
||||
klog.Errorf("Failed to unallocate ResourceBindingInfo <%s/%s>, err: %v",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, err)
|
||||
}
|
||||
} else {
|
||||
rbi.DispatchStatus = api.UnSuspending
|
||||
dispatcher.cache.UnSuspendResourceBinding(types.NamespacedName{
|
||||
Namespace: rbi.ResourceBinding.Namespace,
|
||||
Name: rbi.ResourceBinding.Name,
|
||||
})
|
||||
dispatchResourceBindingCount++
|
||||
}
|
||||
} else {
|
||||
klog.V(3).Infof("Queue <%s> is overused when considering ResourceBindingInfo <%s/%s>, ignore it.", queue.Name, rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
|
||||
}
|
||||
|
||||
// Add the queue to roundedQueues if it still has ResourceBinding.
|
||||
if !resourceBindings.Empty() {
|
||||
|
|
|
@ -20,12 +20,14 @@ import (
|
|||
"testing"
|
||||
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling"
|
||||
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/api"
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/options"
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/uthelper"
|
||||
)
|
||||
|
||||
|
@ -41,8 +43,8 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
return schedulingapi.NewQueueInfo(queue)
|
||||
}
|
||||
|
||||
// buildResoueceBinding return a ResourceBindingInfo
|
||||
buildResoueceBinding := func(name string, namespace string, queueName string) *api.ResourceBindingInfo {
|
||||
// buildResourceBinding return a ResourceBindingInfo
|
||||
buildResourceBinding := func(name string, namespace string, queueName string) *api.ResourceBindingInfo {
|
||||
rb := &workv1alpha2.ResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
|
@ -76,8 +78,8 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
buildQueue("queue1"),
|
||||
},
|
||||
resourceBindingInfos: []*api.ResourceBindingInfo{
|
||||
buildResoueceBinding("rb1", "default", "queue1"),
|
||||
buildResoueceBinding("rb2", "default", "queue1"),
|
||||
buildResourceBinding("rb1", "default", "queue1"),
|
||||
buildResourceBinding("rb2", "default", "queue1"),
|
||||
},
|
||||
expectedOrder: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb1"},
|
||||
|
@ -91,12 +93,12 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
buildQueue("queue2"),
|
||||
},
|
||||
resourceBindingInfos: []*api.ResourceBindingInfo{
|
||||
buildResoueceBinding("rb1", "default", "queue1"),
|
||||
buildResoueceBinding("rb2", "default", "queue1"),
|
||||
buildResoueceBinding("rb3", "default", "queue1"),
|
||||
buildResoueceBinding("rb4", "default", "queue2"),
|
||||
buildResoueceBinding("rb5", "default", "queue2"),
|
||||
buildResoueceBinding("rb6", "default", "queue2"),
|
||||
buildResourceBinding("rb1", "default", "queue1"),
|
||||
buildResourceBinding("rb2", "default", "queue1"),
|
||||
buildResourceBinding("rb3", "default", "queue1"),
|
||||
buildResourceBinding("rb4", "default", "queue2"),
|
||||
buildResourceBinding("rb5", "default", "queue2"),
|
||||
buildResourceBinding("rb6", "default", "queue2"),
|
||||
},
|
||||
expectedOrder: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb1"},
|
||||
|
@ -126,7 +128,7 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
resourceBindingInfoMap[rbi.ResourceUID] = rbi
|
||||
}
|
||||
fakeCache := &uthelper.FakeDispatcherCache{
|
||||
DefaultQueue: defaultQueue,
|
||||
DefaultQueue: options.DefaultQueue,
|
||||
Queues: queueInfoMap,
|
||||
ResourceBindingInfos: resourceBindingInfoMap,
|
||||
UnSuspendingOrder: []types.NamespacedName{},
|
||||
|
@ -135,7 +137,7 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
// Create a dispatcher
|
||||
dispatcher := &Dispatcher{
|
||||
cache: fakeCache,
|
||||
dispatchPeriod: defaultDispatchPeriod,
|
||||
dispatchPeriod: options.DefaultDispatchPeriod,
|
||||
}
|
||||
|
||||
dispatcher.runOnce()
|
||||
|
@ -161,3 +163,157 @@ func TestDispatcherRoundRobin(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherCapacity(t *testing.T) {
|
||||
// buildQueue return a scheduling QueueInfo
|
||||
buildQueue := func(name string, capacity corev1.ResourceList) *schedulingapi.QueueInfo {
|
||||
queue := &schedulingv1beta1.Queue{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
CreationTimestamp: metav1.Now(),
|
||||
},
|
||||
Spec: schedulingv1beta1.QueueSpec{
|
||||
Capability: capacity,
|
||||
},
|
||||
}
|
||||
return schedulingapi.NewQueueInfo(queue)
|
||||
}
|
||||
|
||||
// buildResourceBinding return a ResourceBindingInfo
|
||||
buildResourceBinding := func(name string, namespace string, queueName string, replicas int32, resReq corev1.ResourceList, dispatchStatus api.DispatchStatus) *api.ResourceBindingInfo {
|
||||
rb := &workv1alpha2.ResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
CreationTimestamp: metav1.Now(),
|
||||
},
|
||||
Spec: workv1alpha2.ResourceBindingSpec{
|
||||
Resource: workv1alpha2.ObjectReference{
|
||||
UID: types.UID(name),
|
||||
},
|
||||
Replicas: replicas,
|
||||
ReplicaRequirements: &workv1alpha2.ReplicaRequirements{
|
||||
ResourceRequest: resReq,
|
||||
},
|
||||
},
|
||||
}
|
||||
return &api.ResourceBindingInfo{
|
||||
ResourceBinding: rb,
|
||||
ResourceUID: rb.Spec.Resource.UID,
|
||||
Queue: queueName,
|
||||
DispatchStatus: dispatchStatus,
|
||||
}
|
||||
}
|
||||
|
||||
// Define the tests
|
||||
tests := []struct {
|
||||
name string
|
||||
queueInfos []*schedulingapi.QueueInfo
|
||||
resourceBindingInfos []*api.ResourceBindingInfo
|
||||
// expectedAllocatable are can allocate resourceBindingInfos
|
||||
expectedAllocatable []types.NamespacedName
|
||||
// expectedAllocatable are can not allocate resourceBindingInfos
|
||||
expectedUnAllocatable []types.NamespacedName
|
||||
}{
|
||||
{
|
||||
name: "Test only can allocate ResourceBindingInfo",
|
||||
queueInfos: []*schedulingapi.QueueInfo{
|
||||
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
|
||||
},
|
||||
resourceBindingInfos: []*api.ResourceBindingInfo{
|
||||
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
|
||||
buildResourceBinding("rb2", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
|
||||
},
|
||||
expectedAllocatable: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test only can not allocate ResourceBindingInfo",
|
||||
queueInfos: []*schedulingapi.QueueInfo{
|
||||
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
|
||||
},
|
||||
resourceBindingInfos: []*api.ResourceBindingInfo{
|
||||
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
|
||||
buildResourceBinding("rb2", "default", "queue1", 4, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
|
||||
},
|
||||
expectedUnAllocatable: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test can allocate and can not allocate ResourceBindingInfo",
|
||||
queueInfos: []*schedulingapi.QueueInfo{
|
||||
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
|
||||
},
|
||||
resourceBindingInfos: []*api.ResourceBindingInfo{
|
||||
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
|
||||
buildResourceBinding("rb2", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
|
||||
buildResourceBinding("rb3", "default", "queue1", 1, schedulingapi.BuildResourceList("2", "2G"), api.Suspended),
|
||||
},
|
||||
expectedAllocatable: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb2"},
|
||||
},
|
||||
expectedUnAllocatable: []types.NamespacedName{
|
||||
{Namespace: "default", Name: "rb3"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// testDispatcherCapacity is a helper function to test the capacity capability of dispatcher.
|
||||
testDispatcherCapacity := func(t *testing.T,
|
||||
queueInfos []*schedulingapi.QueueInfo,
|
||||
resourceBindingInfos []*api.ResourceBindingInfo,
|
||||
expectedAllocatable, expectedUnAllocatable []types.NamespacedName,
|
||||
) {
|
||||
// Create a fake dispatcher cache
|
||||
queueInfoMap := make(map[string]*schedulingapi.QueueInfo)
|
||||
for _, queueInfo := range queueInfos {
|
||||
queueInfoMap[queueInfo.Name] = queueInfo
|
||||
}
|
||||
|
||||
resourceBindingInfoMap := make(map[types.UID]*api.ResourceBindingInfo)
|
||||
for _, rbi := range resourceBindingInfos {
|
||||
resourceBindingInfoMap[rbi.ResourceUID] = rbi
|
||||
}
|
||||
fakeCache := &uthelper.FakeDispatcherCache{
|
||||
DefaultQueue: options.DefaultQueue,
|
||||
Queues: queueInfoMap,
|
||||
ResourceBindingInfos: resourceBindingInfoMap,
|
||||
UnSuspendingOrder: []types.NamespacedName{},
|
||||
TotalResource: schedulingapi.NewResource(schedulingapi.BuildResourceList("100", "100G")),
|
||||
}
|
||||
|
||||
// Create a dispatcher
|
||||
dispatcher := &Dispatcher{
|
||||
cache: fakeCache,
|
||||
dispatchPeriod: options.DefaultDispatchPeriod,
|
||||
}
|
||||
|
||||
dispatcher.runOnce()
|
||||
|
||||
// Check the expectedAllocatable
|
||||
for _, nn := range expectedAllocatable {
|
||||
for _, rbi := range fakeCache.ResourceBindingInfos {
|
||||
if nn.Namespace == rbi.ResourceBinding.Namespace && nn.Name == rbi.ResourceBinding.Name && rbi.DispatchStatus != api.UnSuspended {
|
||||
t.Errorf("ResourceBindingInfos <%s/%s> is not expected, expect allocatable", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check the expectedUnAllocatable
|
||||
for _, nn := range expectedUnAllocatable {
|
||||
for _, rbi := range fakeCache.ResourceBindingInfos {
|
||||
if nn.Namespace == rbi.ResourceBinding.Namespace && nn.Name == rbi.ResourceBinding.Name && rbi.DispatchStatus != api.Suspended {
|
||||
t.Errorf("ResourceBindingInfos <%s/%s> is not expected, expect unallocatable", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the tests
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
testDispatcherCapacity(t, tt.queueInfos, tt.resourceBindingInfos, tt.expectedAllocatable, tt.expectedUnAllocatable)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright 2025 The Volcano Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package framework
|
||||
|
||||
import (
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/api"
|
||||
)
|
||||
|
||||
// EventHandler structure
|
||||
type EventHandler struct {
|
||||
AllocateFunc func(rbi *api.ResourceBindingInfo) error
|
||||
DeallocateFunc func(rbi *api.ResourceBindingInfo) error
|
||||
}
|
|
@ -36,6 +36,8 @@ type Session struct {
|
|||
plugins map[string]Plugin
|
||||
queueInfoOrderFns map[string]volcanoapi.CompareFn
|
||||
resourceBindingInfoOrderFns map[string]volcanoapi.CompareFn
|
||||
allocatableFns map[string]api.AllocatableFn
|
||||
eventHandlers []*EventHandler
|
||||
}
|
||||
|
||||
func OpenSession(cache dispatchercache.DispatcherCacheInterface) *Session {
|
||||
|
@ -46,6 +48,7 @@ func OpenSession(cache dispatchercache.DispatcherCacheInterface) *Session {
|
|||
plugins: map[string]Plugin{},
|
||||
queueInfoOrderFns: map[string]volcanoapi.CompareFn{},
|
||||
resourceBindingInfoOrderFns: map[string]volcanoapi.CompareFn{},
|
||||
allocatableFns: map[string]api.AllocatableFn{},
|
||||
}
|
||||
|
||||
// Register all the plugins to session.
|
||||
|
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package framework
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/api"
|
||||
|
@ -32,6 +34,16 @@ func (ssn *Session) AddQueueInfoOrderFn(name string, compareFn volcanoapi.Compar
|
|||
ssn.queueInfoOrderFns[name] = compareFn
|
||||
}
|
||||
|
||||
// AddAllocatableFn add allocatable function
|
||||
func (ssn *Session) AddAllocatableFn(name string, fn api.AllocatableFn) {
|
||||
ssn.allocatableFns[name] = fn
|
||||
}
|
||||
|
||||
// AddEventHandler add event handlers
|
||||
func (ssn *Session) AddEventHandler(eh *EventHandler) {
|
||||
ssn.eventHandlers = append(ssn.eventHandlers, eh)
|
||||
}
|
||||
|
||||
func (ssn *Session) QueueInfoOrderFn(l, r interface{}) bool {
|
||||
for _, orderFn := range ssn.queueInfoOrderFns {
|
||||
if result := orderFn(l, r); result != 0 {
|
||||
|
@ -67,3 +79,47 @@ func (ssn *Session) ResourceBindingInfoOrderFn(l, r interface{}) bool {
|
|||
|
||||
return lv.ResourceBinding.CreationTimestamp.Before(&rv.ResourceBinding.CreationTimestamp)
|
||||
}
|
||||
|
||||
// Allocatable invoke allocatable function of the plugins
|
||||
func (ssn *Session) Allocatable(queue *volcanoapi.QueueInfo, candidate *api.ResourceBindingInfo) bool {
|
||||
for _, fn := range ssn.allocatableFns {
|
||||
if !fn(queue, candidate) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Allocate event handlers
|
||||
func (ssn *Session) Allocate(rbi *api.ResourceBindingInfo) error {
|
||||
var errInfos []error
|
||||
for _, handler := range ssn.eventHandlers {
|
||||
if handler.AllocateFunc != nil {
|
||||
if err := handler.AllocateFunc(rbi); err != nil {
|
||||
errInfos = append(errInfos, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(errInfos) > 0 {
|
||||
return fmt.Errorf("resourceBindingInfo <%s/%s> allocate error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, len(errInfos))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnAllocate callback event handlers
|
||||
func (ssn *Session) UnAllocate(rbi *api.ResourceBindingInfo) error {
|
||||
var errInfos []error
|
||||
for _, handler := range ssn.eventHandlers {
|
||||
if handler.DeallocateFunc != nil {
|
||||
if err := handler.DeallocateFunc(rbi); err != nil {
|
||||
errInfos = append(errInfos, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(errInfos) > 0 {
|
||||
return fmt.Errorf("resourceBindingInfo <%s/%s> unallocate error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, len(errInfos))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultDispatchPeriod = time.Second
|
||||
defaultQueue = "default"
|
||||
DefaultDispatchPeriod = time.Second
|
||||
DefaultQueue = "default"
|
||||
)
|
||||
|
||||
func newOptions() *options {
|
||||
|
@ -54,6 +54,6 @@ func RegisterDispatcherFlags() *pflag.FlagSet {
|
|||
|
||||
// AddFlags add dispatcher controller related flags.
|
||||
func (o *options) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&o.DefaultQueueName, "default-queue", defaultQueue, "The default queue name of the workload")
|
||||
fs.DurationVar(&o.DispatchPeriod, "dispatch-period", defaultDispatchPeriod, "The period between each scheduling cycle")
|
||||
fs.StringVar(&o.DefaultQueueName, "default-queue", DefaultQueue, "The default queue name of the workload")
|
||||
fs.DurationVar(&o.DispatchPeriod, "dispatch-period", DefaultDispatchPeriod, "The period between each scheduling cycle")
|
||||
}
|
||||
|
|
|
@ -17,18 +17,37 @@ limitations under the License.
|
|||
package capacity
|
||||
|
||||
import (
|
||||
"k8s.io/klog/v2"
|
||||
"volcano.sh/apis/pkg/apis/scheduling"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
|
||||
"volcano.sh/volcano/pkg/scheduler/api/helpers"
|
||||
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/api"
|
||||
"volcano.sh/volcano-global/pkg/dispatcher/framework"
|
||||
)
|
||||
|
||||
const PluginName = "capacity"
|
||||
|
||||
type capacityPlugin struct{}
|
||||
type capacityPlugin struct {
|
||||
queueOpts map[volcanoapi.QueueID]*queueAttr
|
||||
}
|
||||
type queueAttr struct {
|
||||
queueID volcanoapi.QueueID
|
||||
name string
|
||||
share float64
|
||||
capability *volcanoapi.Resource
|
||||
// realCapacity represents the resource limit of the queue.
|
||||
realCapability *volcanoapi.Resource
|
||||
// allocated represents the resource request of all allocated jobs in the queue.
|
||||
allocated *volcanoapi.Resource
|
||||
}
|
||||
|
||||
func New() framework.Plugin {
|
||||
return &capacityPlugin{}
|
||||
return &capacityPlugin{
|
||||
queueOpts: map[volcanoapi.QueueID]*queueAttr{},
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) Name() string {
|
||||
|
@ -36,26 +55,129 @@ func (cp *capacityPlugin) Name() string {
|
|||
}
|
||||
|
||||
func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
|
||||
cp.buildQueueAttrs(ssn)
|
||||
|
||||
// Register the Queue order func
|
||||
ssn.AddQueueInfoOrderFn(cp.Name(), cp.queueOrderFunc)
|
||||
// Register the Allocatable func
|
||||
ssn.AddAllocatableFn(cp.Name(), cp.allocatableFunc)
|
||||
// Register the Event handler func
|
||||
ssn.AddEventHandler(&framework.EventHandler{
|
||||
AllocateFunc: cp.allocateFunc,
|
||||
DeallocateFunc: cp.deallocateFunc,
|
||||
})
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) OnSessionClose(_ *framework.Session) {}
|
||||
func (cp *capacityPlugin) OnSessionClose(_ *framework.Session) {
|
||||
cp.queueOpts = nil
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) queueOrderFunc(l, r interface{}) int {
|
||||
lv := l.(*scheduling.Queue)
|
||||
rv := r.(*scheduling.Queue)
|
||||
lv := l.(*volcanoapi.QueueInfo)
|
||||
rv := r.(*volcanoapi.QueueInfo)
|
||||
|
||||
klog.V(4).Infof("Capacity plugin QueueOrder: <%s/%s> Queue priority %d, <%s/%s> Queue priority %d",
|
||||
lv.Namespace, lv.Name, lv.Spec.Priority, rv.Namespace, rv.Name, rv.Spec.Priority)
|
||||
lv.Queue.Namespace, lv.Queue.Name, lv.Queue.Spec.Priority, rv.Queue.Namespace, rv.Name, rv.Queue.Spec.Priority)
|
||||
|
||||
if lv.Spec.Priority == rv.Spec.Priority {
|
||||
if lv.Queue.Spec.Priority == rv.Queue.Spec.Priority {
|
||||
return 0
|
||||
}
|
||||
|
||||
if lv.Spec.Priority > rv.Spec.Priority {
|
||||
if lv.Queue.Spec.Priority > rv.Queue.Spec.Priority {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
|
||||
for _, rbi := range ssn.Snapshot.ResourceBindingInfos {
|
||||
queue, found := ssn.Snapshot.QueueInfos[ssn.GetResourceBindingInfoQueue(rbi)]
|
||||
if !found {
|
||||
klog.V(4).Infof("ResourceBindingInfo <%s/%s> used Queue <%s> not found in cache, ignore it.", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.Queue)
|
||||
continue
|
||||
}
|
||||
var attr *queueAttr
|
||||
if attr, found = cp.queueOpts[queue.UID]; !found {
|
||||
attr = &queueAttr{
|
||||
queueID: queue.UID,
|
||||
name: queue.Name,
|
||||
allocated: volcanoapi.EmptyResource(),
|
||||
}
|
||||
if len(queue.Queue.Spec.Capability) != 0 {
|
||||
attr.capability = volcanoapi.NewResource(queue.Queue.Spec.Capability)
|
||||
if attr.capability.MilliCPU <= 0 {
|
||||
attr.capability.MilliCPU = math.MaxFloat64
|
||||
}
|
||||
if attr.capability.Memory <= 0 {
|
||||
attr.capability.Memory = math.MaxFloat64
|
||||
}
|
||||
}
|
||||
realCapability := ssn.Snapshot.TotalResource.Clone()
|
||||
if attr.capability == nil {
|
||||
attr.realCapability = realCapability
|
||||
} else {
|
||||
realCapability.MinDimensionResource(attr.capability, volcanoapi.Infinity)
|
||||
attr.realCapability = realCapability
|
||||
}
|
||||
cp.queueOpts[queue.UID] = attr
|
||||
}
|
||||
if rbi.DispatchStatus != api.Suspended {
|
||||
attr.allocated = attr.allocated.Add(rbi.ResReq)
|
||||
}
|
||||
}
|
||||
for _, attr := range cp.queueOpts {
|
||||
cp.updateShare(attr)
|
||||
klog.V(4).Infof("The attributes of queue <%s> in capacity: realCapability <%v>, allocated <%v>, share <%0.2f>.",
|
||||
attr.name, attr.realCapability, attr.allocated, attr.share)
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) allocatableFunc(qi *volcanoapi.QueueInfo, candidate *api.ResourceBindingInfo) bool {
|
||||
attr := cp.queueOpts[qi.UID]
|
||||
futureUsed := attr.allocated.Clone().Add(candidate.ResReq)
|
||||
allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.ResReq)
|
||||
if !allocatable {
|
||||
klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v/%v>: resource request <%v>",
|
||||
qi.Name, attr.realCapability, attr.allocated, candidate.ResourceBinding.Namespace, candidate.ResourceBinding.Name, candidate.ResReq)
|
||||
}
|
||||
return allocatable
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) allocateFunc(rbi *api.ResourceBindingInfo) error {
|
||||
attr, ok := cp.queueOpts[volcanoapi.QueueID(rbi.Queue)]
|
||||
if !ok {
|
||||
err := fmt.Errorf("queue <%v> not found in queueOpts", rbi.Queue)
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
attr.allocated.Add(rbi.ResReq)
|
||||
cp.updateShare(attr)
|
||||
klog.V(4).Infof("Capacity allocateFunc: ResourceBindingInfo <%v/%v>, resreq <%v>, share <%v>",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.ResReq, attr.share)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) deallocateFunc(rbi *api.ResourceBindingInfo) error {
|
||||
attr, ok := cp.queueOpts[volcanoapi.QueueID(rbi.Queue)]
|
||||
if !ok {
|
||||
err := fmt.Errorf("queue <%v> not found in queueOpts", rbi.Queue)
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
attr.allocated.Sub(rbi.ResReq)
|
||||
cp.updateShare(attr)
|
||||
klog.V(4).Infof("Capacity deallocateFunc: ResourceBindingInfo <%v/%v>, resreq <%v>, share <%v>",
|
||||
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.ResReq, attr.share)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *capacityPlugin) updateShare(attr *queueAttr) {
|
||||
res := float64(0)
|
||||
|
||||
for _, rn := range attr.realCapability.ResourceNames() {
|
||||
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.realCapability.Get(rn)))
|
||||
}
|
||||
|
||||
attr.share = res
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ type FakeDispatcherCache struct {
|
|||
DefaultQueue string
|
||||
Queues map[string]*schedulingapi.QueueInfo
|
||||
ResourceBindingInfos map[types.UID]*api.ResourceBindingInfo
|
||||
TotalResource *schedulingapi.Resource
|
||||
|
||||
UnSuspendingOrder []types.NamespacedName
|
||||
}
|
||||
|
@ -50,6 +51,7 @@ func (f *FakeDispatcherCache) Snapshot() *cache.DispatcherCacheSnapshot {
|
|||
DefaultQueue: f.DefaultQueue,
|
||||
QueueInfos: f.Queues,
|
||||
ResourceBindingInfos: f.ResourceBindingInfos,
|
||||
TotalResource: f.TotalResource,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
Copyright 2025 The Volcano Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func CheckClusterReady(cluster *clusterv1alpha1.Cluster) (bool, string) {
|
||||
for _, condition := range cluster.Status.Conditions {
|
||||
if condition.Type == clusterv1alpha1.ClusterConditionReady {
|
||||
if condition.Status == metav1.ConditionTrue {
|
||||
return true, ""
|
||||
} else {
|
||||
return false, fmt.Sprintf("Cluster <%s> is not ready, reason: %s, message: %s", cluster.Name, condition.Reason, condition.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, fmt.Sprintf("Cluster<%s> has not %s Condition", cluster.Name, clusterv1alpha1.ClusterConditionReady)
|
||||
}
|
|
@ -22,6 +22,8 @@ import (
|
|||
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
|
||||
)
|
||||
|
||||
const defaultQueue = "default"
|
||||
|
||||
// GetObjQueue returns the queue name of an obj.
|
||||
// There are 3 ways to get queue name for now:
|
||||
// scheduling.volcano.sh/queue-name support only annotation
|
||||
|
@ -39,5 +41,5 @@ func GetObjQueue(obj metav1.Object) string {
|
|||
if _, ok := annotations[schedulingv1beta1.QueueNameAnnotationKey]; ok {
|
||||
return annotations[schedulingv1beta1.QueueNameAnnotationKey]
|
||||
}
|
||||
return ""
|
||||
return defaultQueue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue