Compare commits

...

10 Commits
v0.1.0 ... main

Author SHA1 Message Date
Volcano Bot 04d1afc71d
Merge pull request #22 from Monokaix/dev
fix queue allocate err
2025-06-24 10:43:16 +08:00
Monokaix 6434092d7c fix queue allocate err
Signed-off-by: Monokaix <changxuzheng@huawei.com>
2025-06-19 15:43:46 +08:00
Volcano Bot 0aed15c49f
Merge pull request #21 from anryko/fix_docs_1
fix: docs example is broken
2025-06-19 15:31:09 +08:00
Andrej Svenke 3080740142 fix: docs example is broken
Signed-off-by: Andrej Svenke <anryko@nebius.com>
2025-06-11 11:24:29 +02:00
Volcano Bot f186d9c6de
Merge pull request #17 from tanberBro/main
bugfix: cache updates from events all use atomic locks and the calculation of allocated use rbi.DispatchStatus != api.Suspended.
2025-05-30 11:23:51 +08:00
tanberBro 46f6f7cba0 bugfix: cache updates from events all use atomic locks and the calculation of allocated use rbi.DispatchStatus != api.Suspended.
Signed-off-by: tanberBro <songpf1@chinatelecom.cn>
2025-05-30 10:37:42 +08:00
Volcano Bot 15294dac0e
Merge pull request #16 from tanberBro/main
feature: queue capacity management
2025-05-29 20:32:51 +08:00
tanberBro a6a4afb668 feature: queue capacity management
Signed-off-by: tanberBro <songpf1@chinatelecom.cn>
2025-05-29 15:52:25 +08:00
Volcano Bot fad3edc7f3
Merge pull request #13 from Monokaix/dev
Update karmada version
2025-02-21 18:52:32 +08:00
Monokaix 31214b1d27 Update karmada version
Signed-off-by: Monokaix <changxuzheng@huawei.com>
2025-02-20 16:02:51 +08:00
24 changed files with 681 additions and 119 deletions

View File

@ -15,7 +15,7 @@ You can modify the deployment method according to different environments.
## 1. Deploy the Karmada
Suggest `Karmada` Version: **master**
Suggest `Karmada` Version: **v1.13.0-beta.0 or higher**
Follow the [karmada get started guide](https://karmada.io/docs/get-started/nginx-example) to deploy `Karmada`.
@ -31,7 +31,7 @@ cd karmada
## 2. Deploy the Volcano to member clusters
Suggest `Volcano` Version: **1.10.0**
Suggest `Volcano` Version: **1.10.0 or higher**
Follow the [volcano installation guide](https://volcano.sh/en/docs/v1-9-0/installation/) to deploy `Volcano` to the member clusters.
@ -69,21 +69,6 @@ kubectl --context karmada-host annotate secret karmada-webhook-config \
## 4. Deploy the volcano-global controller and webhook manager at Karmada control plane cluster
You need to build the images on the root direction of the project.
```bash
git clone https://github.com/volcano-sh/volcano-global.git
cd volcano-global
# Build the components.
TAG=1.0 make images
# Load the image to karmada host cluster.
kind load docker-image --name karmada-host volcanosh/volcano-global-controller-manager:1.0
kind load docker-image --name karmada-host volcanosh/volcano-global-webhook-manager:1.0
```
```bash
# Switch to Karmada host kubeconfig.
export KUBECONFIG=$HOME/.kube/karmada.config

View File

@ -16,7 +16,7 @@ spec:
automountServiceAccountToken: false
containers:
- name: volcano-global-controllers-manager
image: volcanosh/volcano-global-controller-manager:1.0
image: volcanosh/volcano-global-controller-manager:latest
args:
- --kubeconfig=/etc/kubeconfig/karmada.config
- --leader-elect=false
@ -26,7 +26,7 @@ spec:
- --dispatch-period=1s
- -v=5
- 2>&1
imagePullPolicy: Never
imagePullPolicy: IfNotPresent
volumeMounts:
- name: webhook-config
mountPath: /etc/kubeconfig

View File

@ -31,8 +31,8 @@ spec:
- --port=8443
- -v=5
- 2>&1
image: volcanosh/volcano-global-webhook-manager:1.0
imagePullPolicy: Never
image: volcanosh/volcano-global-webhook-manager:latest
imagePullPolicy: IfNotPresent
volumeMounts:
- mountPath: /admission.local.config/certificates
name: admission-certs
@ -78,8 +78,8 @@ spec:
restartPolicy: Never
containers:
- name: main
image: volcanosh/volcano-global-webhook-manager:1.0
imagePullPolicy: Never
image: volcanosh/volcano-global-webhook-manager:latest
imagePullPolicy: IfNotPresent
command: ["./gen-admission-secret.sh", "--service", "volcano-global-webhook", "--namespace",
"volcano-global", "--secret", "volcano-global-webhook-cert"]
---

12
go.mod
View File

@ -1,9 +1,9 @@
module volcano.sh/volcano-global
go 1.22.9
go 1.22.11
require (
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712
github.com/karmada-io/karmada v1.13.0-beta.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
gomodules.xyz/jsonpatch/v2 v2.4.0
@ -58,11 +58,11 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.34.2 // indirect

28
go.sum
View File

@ -95,8 +95,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712 h1:o3xSYSPVtififOUUfRJV96iWkKEhIZbf2d3+UAgmX+4=
github.com/karmada-io/karmada v1.13.0-alpha.2.0.20250118102547-5bfdf615c712/go.mod h1:ccoLWknjS1hHmBuAlRBP6Y55kLGe39GpTSs9LAW+7Wc=
github.com/karmada-io/karmada v1.13.0-beta.0 h1:1hdIYTC6SstBC3/85b6+CiCLufd73WvDTTupRA55Y2o=
github.com/karmada-io/karmada v1.13.0-beta.0/go.mod h1:1z5VJPUWPwfjZJnXJiIBJFjlZc/BTuD+5z1jJbFfHBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@ -201,8 +201,8 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -211,26 +211,26 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.22.9 AS builder
FROM golang:1.22.11 AS builder
WORKDIR /go/src/volcano.sh/
COPY go.mod go.sum ./
RUN go mod download

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.22.9 AS builder
FROM golang:1.22.11 AS builder
WORKDIR /go/src/volcano.sh/
COPY go.mod go.sum ./
RUN go mod download

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.22.9 AS builder
FROM golang:1.22.11 AS builder
WORKDIR /go/src/volcano.sh/
COPY go.mod go.sum ./
RUN go mod download

View File

@ -19,6 +19,7 @@ package api
import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"k8s.io/apimachinery/pkg/types"
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
)
type DispatchStatus int16
@ -35,6 +36,7 @@ type ResourceBindingInfo struct {
Queue string
PriorityClassName string
DispatchStatus DispatchStatus
ResReq *volcanoapi.Resource
// Update it when snapshot.
Priority int32
@ -47,6 +49,7 @@ func (rbi *ResourceBindingInfo) DeepCopy() *ResourceBindingInfo {
Queue: rbi.Queue,
PriorityClassName: rbi.PriorityClassName,
DispatchStatus: rbi.DispatchStatus,
ResReq: rbi.ResReq.Clone(),
Priority: rbi.Priority,
}

View File

@ -0,0 +1,23 @@
/*
Copyright 2025 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
)
type AllocatableFn func(qi *volcanoapi.QueueInfo, rbi *ResourceBindingInfo) bool

View File

@ -20,9 +20,11 @@ import (
"fmt"
"sync"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
karmadainformerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
informerclusterv1alpha1 "github.com/karmada-io/karmada/pkg/generated/informers/externalversions/cluster/v1alpha1"
informerworkv1aplha2 "github.com/karmada-io/karmada/pkg/generated/informers/externalversions/work/v1alpha2"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
@ -83,6 +85,10 @@ type DispatcherCache struct {
// resourceBindingInfos[namespace][name] = target ResourceBindingInfo.
resourceBindingInfos map[string]map[string]*api.ResourceBindingInfo
clusterInformer informerclusterv1alpha1.ClusterInformer
// clusters[name] = target Cluster
clusters map[string]*clusterv1alpha1.Cluster
// Its queue for unsuspend the ResourceBinding, when a ResourceBinding finish dispatch,
// The Dispatcher will add a task to here, and update the ResourceBinding.spec.Suspend = false.
unSuspendRBTaskQueue workqueue.Interface
@ -135,6 +141,8 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
resourceBindingInfos: map[string]map[string]*api.ResourceBindingInfo{},
clusters: map[string]*clusterv1alpha1.Cluster{},
unSuspendRBTaskQueue: workqueue.New(),
}
@ -158,6 +166,12 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
UpdateFunc: sc.updateResourceBinding,
DeleteFunc: sc.deleteResourceBinding,
})
sc.clusterInformer = sc.karmadaInformerFactor.Cluster().V1alpha1().Clusters()
sc.clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.addCluster,
UpdateFunc: sc.updateCluster,
DeleteFunc: sc.deleteCluster,
})
return sc
}

View File

@ -19,7 +19,9 @@ package cache
import (
"context"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
schedulingv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -27,8 +29,10 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano-global/pkg/dispatcher/api"
"volcano.sh/volcano-global/pkg/utils"
"volcano.sh/volcano-global/pkg/workload"
)
@ -38,17 +42,17 @@ func (dc *DispatcherCache) addQueue(obj interface{}) {
return
}
// Convert the queue from v1beta1 to v1
v1queue := &scheduling.Queue{}
if err := scheme.Scheme.Convert(queue, v1queue, nil); err != nil {
klog.Errorf("Failed to convert queue from %T to %T", queue, v1queue)
// Convert the queue from v1beta1 to internal
internalQueue := &scheduling.Queue{}
if err := scheme.Scheme.Convert(queue, internalQueue, nil); err != nil {
klog.Errorf("Failed to convert queue from %T to %T", queue, internalQueue)
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
dc.queues[queue.Name] = schedulingapi.NewQueueInfo(v1queue)
dc.setQueue(internalQueue)
}
func (dc *DispatcherCache) deleteQueue(obj interface{}) {
@ -68,9 +72,25 @@ func (dc *DispatcherCache) updateQueue(oldObj, newObj interface{}) {
if oldQueue == nil || newQueue == nil {
return
}
if oldQueue.ResourceVersion == newQueue.ResourceVersion {
return
}
dc.deleteQueue(oldQueue)
dc.addQueue(newQueue)
// Convert the queue from v1beta1 to internal
internalQueue := &scheduling.Queue{}
if err := scheme.Scheme.Convert(newQueue, internalQueue, nil); err != nil {
klog.Errorf("Failed to convert queue from %T to %T", newQueue, internalQueue)
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
dc.setQueue(internalQueue)
}
func (dc *DispatcherCache) setQueue(queue *scheduling.Queue) {
dc.queues[queue.Name] = schedulingapi.NewQueueInfo(queue)
}
func (dc *DispatcherCache) addPriorityClass(obj interface{}) {
@ -81,12 +101,7 @@ func (dc *DispatcherCache) addPriorityClass(obj interface{}) {
dc.mutex.Lock()
defer dc.mutex.Unlock()
if pc.GlobalDefault {
klog.V(3).Infof("Set default PriorityClass to <%s>, Priority <%d>.", pc.Name, pc.Value)
dc.defaultPriorityClass = pc
}
dc.priorityClasses[pc.Name] = pc
dc.setPriorityClass(pc)
}
func (dc *DispatcherCache) deletePriorityClass(obj interface{}) {
@ -94,6 +109,7 @@ func (dc *DispatcherCache) deletePriorityClass(obj interface{}) {
if pc == nil {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
@ -110,8 +126,27 @@ func (dc *DispatcherCache) updatePriorityClass(oldObj, newObj interface{}) {
if oldPc == nil || newPc == nil {
return
}
dc.deletePriorityClass(oldPc)
dc.addPriorityClass(newPc)
if oldPc.ResourceVersion == newPc.ResourceVersion {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
if oldPc.GlobalDefault {
klog.V(5).Infof("Delete default PriorityClass <%s>, Priority <%d>.", oldPc.Name, oldPc.Value)
dc.defaultPriorityClass = nil
}
dc.setPriorityClass(newPc)
}
func (dc *DispatcherCache) setPriorityClass(pc *schedulingv1.PriorityClass) {
if pc.GlobalDefault {
klog.V(3).Infof("Set default PriorityClass to <%s>, Priority <%d>.", pc.Name, pc.Value)
dc.defaultPriorityClass = pc
}
dc.priorityClasses[pc.Name] = pc
}
func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
@ -120,6 +155,48 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
dc.setResourceBinding(rb)
}
func (dc *DispatcherCache) deleteResourceBinding(obj interface{}) {
rb := convertToResourceBinding(obj)
if rb == nil {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
if dc.resourceBindings[rb.Namespace] == nil {
klog.Errorf("Failed to delete ResourceBinding <%s/%s>, the Resourcebinding's "+
"Namespace is not in the cache.", rb.Namespace, rb.Name)
return
} else {
delete(dc.resourceBindings[rb.Namespace], rb.Name)
delete(dc.resourceBindingInfos[rb.Namespace], rb.Name)
}
}
func (dc *DispatcherCache) updateResourceBinding(oldObj, newObj interface{}) {
oldRb := convertToResourceBinding(oldObj)
newRb := convertToResourceBinding(newObj)
if oldRb == nil || newRb == nil {
return
}
if oldRb.ResourceVersion == newRb.ResourceVersion {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
dc.setResourceBinding(newRb)
}
func (dc *DispatcherCache) setResourceBinding(rb *workv1alpha2.ResourceBinding) {
// Check if its workload, skip add to cache if not.
isWorkload, newWorkloadFunc, err := workload.TryGetNewWorkloadFunc(rb.Spec.Resource)
if err != nil {
@ -145,10 +222,6 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
// Add the ResourceBinding to cache.
if dc.resourceBindings[rb.Namespace] == nil {
dc.resourceBindings[rb.Namespace] = map[string]*workv1alpha2.ResourceBinding{
rb.Name: rb,
@ -165,6 +238,13 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
PriorityClassName: workload.PriorityClassName(),
DispatchStatus: api.UnSuspended,
}
resReq := volcanoapi.EmptyResource()
if rb.Spec.ReplicaRequirements != nil {
resReq = volcanoapi.NewResource(rb.Spec.ReplicaRequirements.ResourceRequest).Multi(float64(rb.Spec.Replicas))
}
newResourceBindingInfo.ResReq = resReq
// Currently, our failurePolicy is set to Fail, which ensures that no unexpected ResourceBindings will exist.
// When a ResourceBinding is created, it will definitely be updated to Suspend, so we don't need to check the Status,
// so rb should be suspended normally.
@ -181,33 +261,52 @@ func (dc *DispatcherCache) addResourceBinding(obj interface{}) {
}
}
func (dc *DispatcherCache) deleteResourceBinding(obj interface{}) {
rb := convertToResourceBinding(obj)
if rb == nil {
func (dc *DispatcherCache) addCluster(obj interface{}) {
cluster := convertToCluster(obj)
if cluster == nil {
return
}
if ready, message := utils.CheckClusterReady(cluster); !ready {
klog.V(5).Info(message)
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
if dc.resourceBindings[rb.Namespace] == nil {
klog.Errorf("Failed to delete ResourceBinding <%s/%s>, the Resourcebinding's "+
"Namespace is not in the cache.", rb.Namespace, rb.Name)
return
} else {
delete(dc.resourceBindings[rb.Namespace], rb.Name)
delete(dc.resourceBindingInfos[rb.Namespace], rb.Name)
}
dc.setCluster(cluster)
}
func (dc *DispatcherCache) updateResourceBinding(oldObj, newObj interface{}) {
oldRb := convertToResourceBinding(oldObj)
newRb := convertToResourceBinding(newObj)
if oldRb == nil || newRb == nil {
func (dc *DispatcherCache) deleteCluster(obj interface{}) {
cluster := convertToCluster(obj)
if cluster == nil {
return
}
dc.deleteResourceBinding(oldRb)
dc.addResourceBinding(newRb)
dc.mutex.Lock()
defer dc.mutex.Unlock()
delete(dc.clusters, cluster.Name)
}
func (dc *DispatcherCache) updateCluster(oldObj, newObj interface{}) {
oldCluster := convertToCluster(oldObj)
newCluster := convertToCluster(newObj)
if oldCluster == nil || newCluster == nil {
return
}
if oldCluster.ResourceVersion == newCluster.ResourceVersion {
return
}
dc.mutex.Lock()
defer dc.mutex.Unlock()
dc.setCluster(newCluster)
}
func (dc *DispatcherCache) setCluster(cluster *clusterv1alpha1.Cluster) {
dc.clusters[cluster.Name] = cluster
}
func (dc *DispatcherCache) getResourceFromObjectReference(ref workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/client-go/tools/cache"
@ -31,7 +32,7 @@ func convertToQueue(obj interface{}) *schedulingv1beta1.Queue {
queue, ok := obj.(*schedulingv1beta1.Queue)
if !ok {
klog.Errorf("Cant Convert obj to *schedulingv1beta1.Queue, obj: %v", obj)
klog.Errorf("Can't Convert obj to *schedulingv1beta1.Queue, obj: %v", obj)
return nil
}
return queue
@ -44,7 +45,7 @@ func convertToPriorityClass(obj interface{}) *schedulingv1.PriorityClass {
priorityClass, ok := obj.(*schedulingv1.PriorityClass)
if !ok {
klog.Errorf("Cant Convert obj to *schedulingv1.PriorityClass, obj: %v", obj)
klog.Errorf("Can't Convert obj to *schedulingv1.PriorityClass, obj: %v", obj)
return nil
}
return priorityClass
@ -57,8 +58,21 @@ func convertToResourceBinding(obj interface{}) *workv1alpha2.ResourceBinding {
resourceBinding, ok := obj.(*workv1alpha2.ResourceBinding)
if !ok {
klog.Errorf("Cant Convert obj to *workv1alpha2.ResourceBinding, obj: %v", obj)
klog.Errorf("Can't Convert obj to *workv1alpha2.ResourceBinding, obj: %v", obj)
return nil
}
return resourceBinding
}
func convertToCluster(obj interface{}) *clusterv1alpha1.Cluster {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
cluster, ok := obj.(*clusterv1alpha1.Cluster)
if !ok {
klog.Errorf("Can't Convert obj to *clusterv1alpha1.Cluster, obj: %v", obj)
return nil
}
return cluster
}

View File

@ -30,6 +30,8 @@ type DispatcherCacheSnapshot struct {
QueueInfos map[string]*schedulingapi.QueueInfo
ResourceBindingInfos map[types.UID]*api.ResourceBindingInfo
TotalResource *schedulingapi.Resource
}
func (dc *DispatcherCache) Snapshot() *DispatcherCacheSnapshot {
@ -40,12 +42,17 @@ func (dc *DispatcherCache) Snapshot() *DispatcherCacheSnapshot {
DefaultQueue: dc.defaultQueue,
QueueInfos: make(map[string]*schedulingapi.QueueInfo, len(dc.queues)),
ResourceBindingInfos: make(map[types.UID]*api.ResourceBindingInfo),
TotalResource: schedulingapi.EmptyResource(),
}
for _, queue := range dc.queues {
snapshot.QueueInfos[queue.Name] = queue.Clone()
}
for _, cluster := range dc.clusters {
snapshot.TotalResource.Add(schedulingapi.NewResource(cluster.Status.ResourceSummary.Allocatable))
}
// Collect the ResourceBindingInfos.
// First, we should update the elements of the ResourceBindingInfo,
// because we only set some elements of them when creating the ResourceBindingInfo.

View File

@ -169,13 +169,25 @@ func (dispatcher *Dispatcher) dispatch(ssn *dispatcherframework.Session) {
// Get the highest priority ResourceBinding from the queue.
rbi := resourceBindings.Pop().(*api.ResourceBindingInfo)
rbi.DispatchStatus = api.UnSuspending
dispatcher.cache.UnSuspendResourceBinding(types.NamespacedName{
Namespace: rbi.ResourceBinding.Namespace,
Name: rbi.ResourceBinding.Name,
})
dispatchResourceBindingCount++
if ssn.Allocatable(queue, rbi) {
if err := ssn.Allocate(rbi); err != nil {
klog.Errorf("Failed to allocate ResourceBindingInfo <%s/%s>, err: %v",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, err)
if err = ssn.UnAllocate(rbi); err != nil {
klog.Errorf("Failed to unallocate ResourceBindingInfo <%s/%s>, err: %v",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, err)
}
} else {
rbi.DispatchStatus = api.UnSuspending
dispatcher.cache.UnSuspendResourceBinding(types.NamespacedName{
Namespace: rbi.ResourceBinding.Namespace,
Name: rbi.ResourceBinding.Name,
})
dispatchResourceBindingCount++
}
} else {
klog.V(3).Infof("Queue <%s> is overused when considering ResourceBindingInfo <%s/%s>, ignore it.", queue.Name, rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
}
// Add the queue to roundedQueues if it still has ResourceBinding.
if !resourceBindings.Empty() {

View File

@ -20,12 +20,14 @@ import (
"testing"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano-global/pkg/dispatcher/api"
"volcano.sh/volcano-global/pkg/dispatcher/options"
"volcano.sh/volcano-global/pkg/dispatcher/uthelper"
)
@ -41,8 +43,8 @@ func TestDispatcherRoundRobin(t *testing.T) {
return schedulingapi.NewQueueInfo(queue)
}
// buildResoueceBinding return a ResourceBindingInfo
buildResoueceBinding := func(name string, namespace string, queueName string) *api.ResourceBindingInfo {
// buildResourceBinding return a ResourceBindingInfo
buildResourceBinding := func(name string, namespace string, queueName string) *api.ResourceBindingInfo {
rb := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@ -76,8 +78,8 @@ func TestDispatcherRoundRobin(t *testing.T) {
buildQueue("queue1"),
},
resourceBindingInfos: []*api.ResourceBindingInfo{
buildResoueceBinding("rb1", "default", "queue1"),
buildResoueceBinding("rb2", "default", "queue1"),
buildResourceBinding("rb1", "default", "queue1"),
buildResourceBinding("rb2", "default", "queue1"),
},
expectedOrder: []types.NamespacedName{
{Namespace: "default", Name: "rb1"},
@ -91,12 +93,12 @@ func TestDispatcherRoundRobin(t *testing.T) {
buildQueue("queue2"),
},
resourceBindingInfos: []*api.ResourceBindingInfo{
buildResoueceBinding("rb1", "default", "queue1"),
buildResoueceBinding("rb2", "default", "queue1"),
buildResoueceBinding("rb3", "default", "queue1"),
buildResoueceBinding("rb4", "default", "queue2"),
buildResoueceBinding("rb5", "default", "queue2"),
buildResoueceBinding("rb6", "default", "queue2"),
buildResourceBinding("rb1", "default", "queue1"),
buildResourceBinding("rb2", "default", "queue1"),
buildResourceBinding("rb3", "default", "queue1"),
buildResourceBinding("rb4", "default", "queue2"),
buildResourceBinding("rb5", "default", "queue2"),
buildResourceBinding("rb6", "default", "queue2"),
},
expectedOrder: []types.NamespacedName{
{Namespace: "default", Name: "rb1"},
@ -126,7 +128,7 @@ func TestDispatcherRoundRobin(t *testing.T) {
resourceBindingInfoMap[rbi.ResourceUID] = rbi
}
fakeCache := &uthelper.FakeDispatcherCache{
DefaultQueue: defaultQueue,
DefaultQueue: options.DefaultQueue,
Queues: queueInfoMap,
ResourceBindingInfos: resourceBindingInfoMap,
UnSuspendingOrder: []types.NamespacedName{},
@ -135,7 +137,7 @@ func TestDispatcherRoundRobin(t *testing.T) {
// Create a dispatcher
dispatcher := &Dispatcher{
cache: fakeCache,
dispatchPeriod: defaultDispatchPeriod,
dispatchPeriod: options.DefaultDispatchPeriod,
}
dispatcher.runOnce()
@ -161,3 +163,157 @@ func TestDispatcherRoundRobin(t *testing.T) {
})
}
}
func TestDispatcherCapacity(t *testing.T) {
// buildQueue return a scheduling QueueInfo
buildQueue := func(name string, capacity corev1.ResourceList) *schedulingapi.QueueInfo {
queue := &schedulingv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Now(),
},
Spec: schedulingv1beta1.QueueSpec{
Capability: capacity,
},
}
return schedulingapi.NewQueueInfo(queue)
}
// buildResourceBinding return a ResourceBindingInfo
buildResourceBinding := func(name string, namespace string, queueName string, replicas int32, resReq corev1.ResourceList, dispatchStatus api.DispatchStatus) *api.ResourceBindingInfo {
rb := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
CreationTimestamp: metav1.Now(),
},
Spec: workv1alpha2.ResourceBindingSpec{
Resource: workv1alpha2.ObjectReference{
UID: types.UID(name),
},
Replicas: replicas,
ReplicaRequirements: &workv1alpha2.ReplicaRequirements{
ResourceRequest: resReq,
},
},
}
return &api.ResourceBindingInfo{
ResourceBinding: rb,
ResourceUID: rb.Spec.Resource.UID,
Queue: queueName,
DispatchStatus: dispatchStatus,
}
}
// Define the tests
tests := []struct {
name string
queueInfos []*schedulingapi.QueueInfo
resourceBindingInfos []*api.ResourceBindingInfo
// expectedAllocatable are can allocate resourceBindingInfos
expectedAllocatable []types.NamespacedName
// expectedAllocatable are can not allocate resourceBindingInfos
expectedUnAllocatable []types.NamespacedName
}{
{
name: "Test only can allocate ResourceBindingInfo",
queueInfos: []*schedulingapi.QueueInfo{
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
},
resourceBindingInfos: []*api.ResourceBindingInfo{
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
buildResourceBinding("rb2", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
},
expectedAllocatable: []types.NamespacedName{
{Namespace: "default", Name: "rb2"},
},
},
{
name: "Test only can not allocate ResourceBindingInfo",
queueInfos: []*schedulingapi.QueueInfo{
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
},
resourceBindingInfos: []*api.ResourceBindingInfo{
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
buildResourceBinding("rb2", "default", "queue1", 4, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
},
expectedUnAllocatable: []types.NamespacedName{
{Namespace: "default", Name: "rb2"},
},
},
{
name: "Test can allocate and can not allocate ResourceBindingInfo",
queueInfos: []*schedulingapi.QueueInfo{
buildQueue("queue1", schedulingapi.BuildResourceList("5", "5G")),
},
resourceBindingInfos: []*api.ResourceBindingInfo{
buildResourceBinding("rb1", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.UnSuspended),
buildResourceBinding("rb2", "default", "queue1", 2, schedulingapi.BuildResourceList("1", "1G"), api.Suspended),
buildResourceBinding("rb3", "default", "queue1", 1, schedulingapi.BuildResourceList("2", "2G"), api.Suspended),
},
expectedAllocatable: []types.NamespacedName{
{Namespace: "default", Name: "rb2"},
},
expectedUnAllocatable: []types.NamespacedName{
{Namespace: "default", Name: "rb3"},
},
},
}
// testDispatcherCapacity is a helper function to test the capacity capability of dispatcher.
testDispatcherCapacity := func(t *testing.T,
queueInfos []*schedulingapi.QueueInfo,
resourceBindingInfos []*api.ResourceBindingInfo,
expectedAllocatable, expectedUnAllocatable []types.NamespacedName,
) {
// Create a fake dispatcher cache
queueInfoMap := make(map[string]*schedulingapi.QueueInfo)
for _, queueInfo := range queueInfos {
queueInfoMap[queueInfo.Name] = queueInfo
}
resourceBindingInfoMap := make(map[types.UID]*api.ResourceBindingInfo)
for _, rbi := range resourceBindingInfos {
resourceBindingInfoMap[rbi.ResourceUID] = rbi
}
fakeCache := &uthelper.FakeDispatcherCache{
DefaultQueue: options.DefaultQueue,
Queues: queueInfoMap,
ResourceBindingInfos: resourceBindingInfoMap,
UnSuspendingOrder: []types.NamespacedName{},
TotalResource: schedulingapi.NewResource(schedulingapi.BuildResourceList("100", "100G")),
}
// Create a dispatcher
dispatcher := &Dispatcher{
cache: fakeCache,
dispatchPeriod: options.DefaultDispatchPeriod,
}
dispatcher.runOnce()
// Check the expectedAllocatable
for _, nn := range expectedAllocatable {
for _, rbi := range fakeCache.ResourceBindingInfos {
if nn.Namespace == rbi.ResourceBinding.Namespace && nn.Name == rbi.ResourceBinding.Name && rbi.DispatchStatus != api.UnSuspended {
t.Errorf("ResourceBindingInfos <%s/%s> is not expected, expect allocatable", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
}
}
}
// Check the expectedUnAllocatable
for _, nn := range expectedUnAllocatable {
for _, rbi := range fakeCache.ResourceBindingInfos {
if nn.Namespace == rbi.ResourceBinding.Namespace && nn.Name == rbi.ResourceBinding.Name && rbi.DispatchStatus != api.Suspended {
t.Errorf("ResourceBindingInfos <%s/%s> is not expected, expect unallocatable", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name)
}
}
}
}
// Run the tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testDispatcherCapacity(t, tt.queueInfos, tt.resourceBindingInfos, tt.expectedAllocatable, tt.expectedUnAllocatable)
})
}
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2025 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"volcano.sh/volcano-global/pkg/dispatcher/api"
)
// EventHandler structure
type EventHandler struct {
AllocateFunc func(rbi *api.ResourceBindingInfo) error
DeallocateFunc func(rbi *api.ResourceBindingInfo) error
}

View File

@ -36,6 +36,8 @@ type Session struct {
plugins map[string]Plugin
queueInfoOrderFns map[string]volcanoapi.CompareFn
resourceBindingInfoOrderFns map[string]volcanoapi.CompareFn
allocatableFns map[string]api.AllocatableFn
eventHandlers []*EventHandler
}
func OpenSession(cache dispatchercache.DispatcherCacheInterface) *Session {
@ -46,6 +48,7 @@ func OpenSession(cache dispatchercache.DispatcherCacheInterface) *Session {
plugins: map[string]Plugin{},
queueInfoOrderFns: map[string]volcanoapi.CompareFn{},
resourceBindingInfoOrderFns: map[string]volcanoapi.CompareFn{},
allocatableFns: map[string]api.AllocatableFn{},
}
// Register all the plugins to session.

View File

@ -17,6 +17,8 @@ limitations under the License.
package framework
import (
"fmt"
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano-global/pkg/dispatcher/api"
@ -32,6 +34,16 @@ func (ssn *Session) AddQueueInfoOrderFn(name string, compareFn volcanoapi.Compar
ssn.queueInfoOrderFns[name] = compareFn
}
// AddAllocatableFn add allocatable function
func (ssn *Session) AddAllocatableFn(name string, fn api.AllocatableFn) {
ssn.allocatableFns[name] = fn
}
// AddEventHandler add event handlers
func (ssn *Session) AddEventHandler(eh *EventHandler) {
ssn.eventHandlers = append(ssn.eventHandlers, eh)
}
func (ssn *Session) QueueInfoOrderFn(l, r interface{}) bool {
for _, orderFn := range ssn.queueInfoOrderFns {
if result := orderFn(l, r); result != 0 {
@ -67,3 +79,47 @@ func (ssn *Session) ResourceBindingInfoOrderFn(l, r interface{}) bool {
return lv.ResourceBinding.CreationTimestamp.Before(&rv.ResourceBinding.CreationTimestamp)
}
// Allocatable invoke allocatable function of the plugins
func (ssn *Session) Allocatable(queue *volcanoapi.QueueInfo, candidate *api.ResourceBindingInfo) bool {
for _, fn := range ssn.allocatableFns {
if !fn(queue, candidate) {
return false
}
}
return true
}
// Allocate event handlers
func (ssn *Session) Allocate(rbi *api.ResourceBindingInfo) error {
var errInfos []error
for _, handler := range ssn.eventHandlers {
if handler.AllocateFunc != nil {
if err := handler.AllocateFunc(rbi); err != nil {
errInfos = append(errInfos, err)
}
}
}
if len(errInfos) > 0 {
return fmt.Errorf("resourceBindingInfo <%s/%s> allocate error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, len(errInfos))
}
return nil
}
// UnAllocate callback event handlers
func (ssn *Session) UnAllocate(rbi *api.ResourceBindingInfo) error {
var errInfos []error
for _, handler := range ssn.eventHandlers {
if handler.DeallocateFunc != nil {
if err := handler.DeallocateFunc(rbi); err != nil {
errInfos = append(errInfos, err)
}
}
}
if len(errInfos) > 0 {
return fmt.Errorf("resourceBindingInfo <%s/%s> unallocate error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, len(errInfos))
}
return nil
}

View File

@ -28,8 +28,8 @@ var (
)
const (
defaultDispatchPeriod = time.Second
defaultQueue = "default"
DefaultDispatchPeriod = time.Second
DefaultQueue = "default"
)
func newOptions() *options {
@ -54,6 +54,6 @@ func RegisterDispatcherFlags() *pflag.FlagSet {
// AddFlags add dispatcher controller related flags.
func (o *options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.DefaultQueueName, "default-queue", defaultQueue, "The default queue name of the workload")
fs.DurationVar(&o.DispatchPeriod, "dispatch-period", defaultDispatchPeriod, "The period between each scheduling cycle")
fs.StringVar(&o.DefaultQueueName, "default-queue", DefaultQueue, "The default queue name of the workload")
fs.DurationVar(&o.DispatchPeriod, "dispatch-period", DefaultDispatchPeriod, "The period between each scheduling cycle")
}

View File

@ -17,18 +17,37 @@ limitations under the License.
package capacity
import (
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/scheduling"
"fmt"
"math"
"k8s.io/klog/v2"
volcanoapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/helpers"
"volcano.sh/volcano-global/pkg/dispatcher/api"
"volcano.sh/volcano-global/pkg/dispatcher/framework"
)
const PluginName = "capacity"
type capacityPlugin struct{}
type capacityPlugin struct {
queueOpts map[volcanoapi.QueueID]*queueAttr
}
type queueAttr struct {
queueID volcanoapi.QueueID
name string
share float64
capability *volcanoapi.Resource
// realCapacity represents the resource limit of the queue.
realCapability *volcanoapi.Resource
// allocated represents the resource request of all allocated jobs in the queue.
allocated *volcanoapi.Resource
}
func New() framework.Plugin {
return &capacityPlugin{}
return &capacityPlugin{
queueOpts: map[volcanoapi.QueueID]*queueAttr{},
}
}
func (cp *capacityPlugin) Name() string {
@ -36,26 +55,129 @@ func (cp *capacityPlugin) Name() string {
}
func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
cp.buildQueueAttrs(ssn)
// Register the Queue order func
ssn.AddQueueInfoOrderFn(cp.Name(), cp.queueOrderFunc)
// Register the Allocatable func
ssn.AddAllocatableFn(cp.Name(), cp.allocatableFunc)
// Register the Event handler func
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: cp.allocateFunc,
DeallocateFunc: cp.deallocateFunc,
})
}
func (cp *capacityPlugin) OnSessionClose(_ *framework.Session) {}
func (cp *capacityPlugin) OnSessionClose(_ *framework.Session) {
cp.queueOpts = nil
}
func (cp *capacityPlugin) queueOrderFunc(l, r interface{}) int {
lv := l.(*scheduling.Queue)
rv := r.(*scheduling.Queue)
lv := l.(*volcanoapi.QueueInfo)
rv := r.(*volcanoapi.QueueInfo)
klog.V(4).Infof("Capacity plugin QueueOrder: <%s/%s> Queue priority %d, <%s/%s> Queue priority %d",
lv.Namespace, lv.Name, lv.Spec.Priority, rv.Namespace, rv.Name, rv.Spec.Priority)
lv.Queue.Namespace, lv.Queue.Name, lv.Queue.Spec.Priority, rv.Queue.Namespace, rv.Name, rv.Queue.Spec.Priority)
if lv.Spec.Priority == rv.Spec.Priority {
if lv.Queue.Spec.Priority == rv.Queue.Spec.Priority {
return 0
}
if lv.Spec.Priority > rv.Spec.Priority {
if lv.Queue.Spec.Priority > rv.Queue.Spec.Priority {
return -1
}
return 1
}
func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
for _, rbi := range ssn.Snapshot.ResourceBindingInfos {
queue, found := ssn.Snapshot.QueueInfos[ssn.GetResourceBindingInfoQueue(rbi)]
if !found {
klog.V(4).Infof("ResourceBindingInfo <%s/%s> used Queue <%s> not found in cache, ignore it.", rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.Queue)
continue
}
var attr *queueAttr
if attr, found = cp.queueOpts[queue.UID]; !found {
attr = &queueAttr{
queueID: queue.UID,
name: queue.Name,
allocated: volcanoapi.EmptyResource(),
}
if len(queue.Queue.Spec.Capability) != 0 {
attr.capability = volcanoapi.NewResource(queue.Queue.Spec.Capability)
if attr.capability.MilliCPU <= 0 {
attr.capability.MilliCPU = math.MaxFloat64
}
if attr.capability.Memory <= 0 {
attr.capability.Memory = math.MaxFloat64
}
}
realCapability := ssn.Snapshot.TotalResource.Clone()
if attr.capability == nil {
attr.realCapability = realCapability
} else {
realCapability.MinDimensionResource(attr.capability, volcanoapi.Infinity)
attr.realCapability = realCapability
}
cp.queueOpts[queue.UID] = attr
}
if rbi.DispatchStatus != api.Suspended {
attr.allocated = attr.allocated.Add(rbi.ResReq)
}
}
for _, attr := range cp.queueOpts {
cp.updateShare(attr)
klog.V(4).Infof("The attributes of queue <%s> in capacity: realCapability <%v>, allocated <%v>, share <%0.2f>.",
attr.name, attr.realCapability, attr.allocated, attr.share)
}
}
func (cp *capacityPlugin) allocatableFunc(qi *volcanoapi.QueueInfo, candidate *api.ResourceBindingInfo) bool {
attr := cp.queueOpts[qi.UID]
futureUsed := attr.allocated.Clone().Add(candidate.ResReq)
allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.ResReq)
if !allocatable {
klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v/%v>: resource request <%v>",
qi.Name, attr.realCapability, attr.allocated, candidate.ResourceBinding.Namespace, candidate.ResourceBinding.Name, candidate.ResReq)
}
return allocatable
}
func (cp *capacityPlugin) allocateFunc(rbi *api.ResourceBindingInfo) error {
attr, ok := cp.queueOpts[volcanoapi.QueueID(rbi.Queue)]
if !ok {
err := fmt.Errorf("queue <%v> not found in queueOpts", rbi.Queue)
klog.Error(err)
return err
}
attr.allocated.Add(rbi.ResReq)
cp.updateShare(attr)
klog.V(4).Infof("Capacity allocateFunc: ResourceBindingInfo <%v/%v>, resreq <%v>, share <%v>",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.ResReq, attr.share)
return nil
}
func (cp *capacityPlugin) deallocateFunc(rbi *api.ResourceBindingInfo) error {
attr, ok := cp.queueOpts[volcanoapi.QueueID(rbi.Queue)]
if !ok {
err := fmt.Errorf("queue <%v> not found in queueOpts", rbi.Queue)
klog.Error(err)
return err
}
attr.allocated.Sub(rbi.ResReq)
cp.updateShare(attr)
klog.V(4).Infof("Capacity deallocateFunc: ResourceBindingInfo <%v/%v>, resreq <%v>, share <%v>",
rbi.ResourceBinding.Namespace, rbi.ResourceBinding.Name, rbi.ResReq, attr.share)
return nil
}
func (cp *capacityPlugin) updateShare(attr *queueAttr) {
res := float64(0)
for _, rn := range attr.realCapability.ResourceNames() {
res = max(res, helpers.Share(attr.allocated.Get(rn), attr.realCapability.Get(rn)))
}
attr.share = res
}

View File

@ -33,6 +33,7 @@ type FakeDispatcherCache struct {
DefaultQueue string
Queues map[string]*schedulingapi.QueueInfo
ResourceBindingInfos map[types.UID]*api.ResourceBindingInfo
TotalResource *schedulingapi.Resource
UnSuspendingOrder []types.NamespacedName
}
@ -50,6 +51,7 @@ func (f *FakeDispatcherCache) Snapshot() *cache.DispatcherCacheSnapshot {
DefaultQueue: f.DefaultQueue,
QueueInfos: f.Queues,
ResourceBindingInfos: f.ResourceBindingInfos,
TotalResource: f.TotalResource,
}
}

37
pkg/utils/cluster.go Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright 2025 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func CheckClusterReady(cluster *clusterv1alpha1.Cluster) (bool, string) {
for _, condition := range cluster.Status.Conditions {
if condition.Type == clusterv1alpha1.ClusterConditionReady {
if condition.Status == metav1.ConditionTrue {
return true, ""
} else {
return false, fmt.Sprintf("Cluster <%s> is not ready, reason: %s, message: %s", cluster.Name, condition.Reason, condition.Message)
}
}
}
return false, fmt.Sprintf("Cluster<%s> has not %s Condition", cluster.Name, clusterv1alpha1.ClusterConditionReady)
}

View File

@ -22,6 +22,8 @@ import (
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)
const defaultQueue = "default"
// GetObjQueue returns the queue name of an obj.
// There are 3 ways to get queue name for now:
// scheduling.volcano.sh/queue-name support only annotation
@ -39,5 +41,5 @@ func GetObjQueue(obj metav1.Object) string {
if _, ok := annotations[schedulingv1beta1.QueueNameAnnotationKey]; ok {
return annotations[schedulingv1beta1.QueueNameAnnotationKey]
}
return ""
return defaultQueue
}