priority scheduling: add priority queue

Signed-off-by: whitewindmills <jayfantasyhjh@gmail.com>
This commit is contained in:
whitewindmills 2024-05-14 17:20:34 +08:00
parent 15800f237e
commit 6c8811a446
13 changed files with 1642 additions and 52 deletions

View File

@ -204,3 +204,12 @@ func (s *ResourceBindingSpec) SchedulingSuspended() bool {
return *s.Suspension.Scheduling
}
// SchedulePriorityValue returns the scheduling priority declared
// by '.spec.SchedulePriority.Priority'.
func (s *ResourceBindingSpec) SchedulePriorityValue() int32 {
if s.SchedulePriority == nil {
return 0
}
return s.SchedulePriority.Priority
}

View File

@ -33,6 +33,8 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
@ -117,14 +119,41 @@ func (s *Scheduler) resourceBindingEventFilter(obj interface{}) bool {
util.GetLabelValue(accessor.GetLabels(), workv1alpha2.BindingManagedByLabel) != ""
}
func newQueuedBindingInfo(obj interface{}) *internalqueue.QueuedBindingInfo {
switch t := obj.(type) {
case *workv1alpha2.ResourceBinding:
return &internalqueue.QueuedBindingInfo{
NamespacedKey: cache.ObjectName{Namespace: t.GetNamespace(), Name: t.GetName()}.String(),
Priority: t.Spec.SchedulePriorityValue(),
}
case *workv1alpha2.ClusterResourceBinding:
return &internalqueue.QueuedBindingInfo{
NamespacedKey: t.GetName(),
Priority: t.Spec.SchedulePriorityValue(),
}
}
return nil
}
func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
bindingInfo := newQueuedBindingInfo(obj)
if bindingInfo == nil {
// shouldn't happen
klog.Errorf("couldn't convert to QueuedBindingInfo %#v", obj)
return
}
s.priorityQueue.Push(bindingInfo)
} else {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", obj, err)
return
}
s.queue.Add(key)
}
metrics.CountSchedulerBindings(metrics.BindingAdd)
}
@ -150,6 +179,16 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
return
}
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
bindingInfo := newQueuedBindingInfo(cur)
if bindingInfo == nil {
// shouldn't happen
klog.Errorf("couldn't convert to QueuedBindingInfo %#v", cur)
return
}
s.priorityQueue.Push(bindingInfo)
} else {
key, err := cache.MetaNamespaceKeyFunc(cur)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", cur, err)
@ -157,10 +196,18 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
}
s.queue.Add(key)
}
metrics.CountSchedulerBindings(metrics.BindingUpdate)
}
func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) {
klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event)
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{
NamespacedKey: cache.ObjectName{Namespace: binding.Namespace, Name: binding.Name}.String(),
Priority: binding.Spec.SchedulePriorityValue(),
})
} else {
key, err := cache.MetaNamespaceKeyFunc(binding)
if err != nil {
klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err)
@ -168,10 +215,18 @@ func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBindi
}
klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event)
s.queue.Add(key)
}
metrics.CountSchedulerBindings(event)
}
func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) {
klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event)
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{
NamespacedKey: clusterResourceBinding.Name,
Priority: clusterResourceBinding.Spec.SchedulePriorityValue(),
})
} else {
key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding)
if err != nil {
klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
@ -179,6 +234,7 @@ func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *work
}
klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event)
s.queue.Add(key)
}
metrics.CountSchedulerBindings(event)
}

View File

@ -0,0 +1,249 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Below is the implementation of the a heap. The logic is pretty much the same
// as cache.heap, however, this heap does not perform synchronization. It leaves
// synchronization to the SchedulingQueue.
// This code is basically lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.32/pkg/scheduler/backend/heap/heap.go
package heap
import (
"container/heap"
"fmt"
metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue"
)
// KeyFunc is a function type to get the key from an object.
type KeyFunc[T any] func(obj T) string
type heapItem[T any] struct {
obj T // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue[T any] struct {
key string
obj T
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data[T any] struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem[T]
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc[T]
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc[T]
}
var (
_ = heap.Interface(&data[any]{}) // heapData is a standard heap
)
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *data[T]) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *data[T]) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *data[T]) Swap(i, j int) {
if i < 0 || j < 0 {
return
}
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
// Push is supposed to be called by container/heap.Push only.
func (h *data[T]) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue[T])
n := len(h.queue)
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by container/heap.Pop only.
func (h *data[T]) Pop() interface{} {
if len(h.queue) == 0 {
return nil
}
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
if !ok {
// This is an error
return nil
}
delete(h.items, key)
return item.obj
}
// Peek returns the head of the heap without removing it.
func (h *data[T]) Peek() (T, bool) {
if len(h.queue) > 0 {
return h.items[h.queue[0]].obj, true
}
var zero T
return zero, false
}
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap[T any] struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *data[T]
// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
// AddOrUpdate inserts an item, and puts it in the queue. The item is updated if it
// already exists.
func (h *Heap[T]) AddOrUpdate(obj T) {
key := h.data.keyFunc(obj)
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue[T]{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
}
// Delete removes an item.
func (h *Heap[T]) Delete(obj T) error {
key := h.data.keyFunc(obj)
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return nil
}
return fmt.Errorf("object not found")
}
// Peek returns the head of the heap without removing it.
func (h *Heap[T]) Peek() (T, bool) {
return h.data.Peek()
}
// Pop returns the head of the heap and removes it.
func (h *Heap[T]) Pop() (T, error) {
obj := heap.Pop(h.data)
if obj != nil {
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return obj.(T), nil
}
var zero T
return zero, fmt.Errorf("heap is empty")
}
// Get returns the requested item, or sets exists=false.
func (h *Heap[T]) Get(obj T) (T, bool) {
key := h.data.keyFunc(obj)
return h.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (h *Heap[T]) GetByKey(key string) (T, bool) {
item, exists := h.data.items[key]
if !exists {
var zero T
return zero, false
}
return item.obj, true
}
// Has checks if obj exists.
func (h *Heap[T]) Has(obj T) bool {
key := h.data.keyFunc(obj)
_, ok := h.GetByKey(key)
return ok
}
// List returns a list of all the items.
func (h *Heap[T]) List() []T {
list := make([]T, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}
// Len returns the number of items in the heap.
func (h *Heap[T]) Len() int {
return len(h.data.queue)
}
// New returns a Heap which can be used to queue up items to process.
func New[T any](keyFn KeyFunc[T], lessFn LessFunc[T]) *Heap[T] {
return NewWithRecorder(keyFn, lessFn, nil)
}
// NewWithRecorder wraps an optional metricRecorder to compose a Heap object.
func NewWithRecorder[T any](keyFn KeyFunc[T], lessFn LessFunc[T], metricRecorder metrics.MetricRecorder) *Heap[T] {
return &Heap[T]{
data: &data[T]{
items: map[string]*heapItem[T]{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
},
metricRecorder: metricRecorder,
}
}
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type LessFunc[T any] func(item1, item2 T) bool

View File

@ -0,0 +1,307 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file was copied from client-go/tools/cache/heap.go and modified
// for our non thread-safe heap
// This code is basically lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.32/pkg/scheduler/backend/heap/heap_test.go
package heap
import (
"testing"
)
func testHeapObjectKeyFunc(obj testHeapObject) string {
return obj.name
}
type testHeapObject struct {
name string
val interface{}
}
type testMetricRecorder int
func (tmr *testMetricRecorder) Inc() {
if tmr != nil {
*tmr++
}
}
func (tmr *testMetricRecorder) Dec() {
if tmr != nil {
*tmr--
}
}
func (tmr *testMetricRecorder) Clear() {
if tmr != nil {
*tmr = 0
}
}
func mkHeapObj(name string, val interface{}) testHeapObject {
return testHeapObject{name: name, val: val}
}
func compareInts(val1 testHeapObject, val2 testHeapObject) bool {
first := val1.val.(int)
second := val2.val.(int)
return first < second
}
// TestHeapBasic tests Heap invariant
func TestHeapBasic(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
const amount = 500
var i int
var zero testHeapObject
// empty queue
if item, ok := h.Peek(); ok || item != zero {
t.Errorf("expected nil object but got %v", item)
}
for i = amount; i > 0; i-- {
h.AddOrUpdate(mkHeapObj(string([]rune{'a', rune(i)}), i))
// Retrieve head without removing it
head, ok := h.Peek()
if e, a := i, head.val; !ok || a != e {
t.Errorf("expected %d, got %d", e, a)
}
}
// Make sure that the numbers are popped in ascending order.
prevNum := 0
for i := 0; i < amount; i++ {
item, err := h.Pop()
num := item.val.(int)
// All the items must be sorted.
if err != nil || prevNum > num {
t.Errorf("got %v out of order, last was %v", item, prevNum)
}
prevNum = num
}
_, err := h.Pop()
if err == nil {
t.Errorf("expected Pop() to error on empty heap")
}
}
// TestHeap_AddOrUpdate_Add tests add capabilities of Heap.AddOrUpdate
// and ensures that heap invariant is preserved after adding items.
func TestHeap_AddOrUpdate_Add(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("baz", 11))
h.AddOrUpdate(mkHeapObj("zab", 30))
h.AddOrUpdate(mkHeapObj("foo", 13)) // This updates "foo".
item, err := h.Pop()
if e, a := 1, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 11, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
if err := h.Delete(mkHeapObj("baz", 11)); err == nil { // Nothing is deleted.
t.Fatalf("nothing should be deleted from the heap")
}
h.AddOrUpdate(mkHeapObj("foo", 14)) // foo is updated.
item, err = h.Pop()
if e, a := 14, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 30, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
}
// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is
// preserved after deleting items.
func TestHeap_Delete(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Delete head. Delete should work with "key" and doesn't care about the value.
if err := h.Delete(mkHeapObj("bar", 200)); err != nil {
t.Fatalf("Failed to delete head.")
}
item, err := h.Pop()
if e, a := 10, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
h.AddOrUpdate(mkHeapObj("zab", 30))
h.AddOrUpdate(mkHeapObj("faz", 30))
curLen := h.data.Len()
// Delete non-existing item.
if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || curLen != h.data.Len() {
t.Fatalf("Didn't expect any item removal")
}
// Delete tail.
if err = h.Delete(mkHeapObj("bal", 31)); err != nil {
t.Fatalf("Failed to delete tail.")
}
// Delete one of the items with value 30.
if err = h.Delete(mkHeapObj("zab", 30)); err != nil {
t.Fatalf("Failed to delete item.")
}
item, err = h.Pop()
if e, a := 11, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
item, err = h.Pop()
if e, a := 30, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
if h.data.Len() != 0 {
t.Fatalf("expected an empty heap.")
}
}
// TestHeap_AddOrUpdate_Update tests update capabilities of Heap.Update
// and ensures that heap invariant is preserved after adding items.
func TestHeap_AddOrUpdate_Update(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Update an item to a value that should push it to the head.
h.AddOrUpdate(mkHeapObj("baz", 0))
if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 {
t.Fatalf("expected baz to be at the head")
}
item, err := h.Pop()
if e, a := 0, item.val; err != nil || a != e {
t.Fatalf("expected %d, got %d", e, a)
}
// Update bar to push it farther back in the queue.
h.AddOrUpdate(mkHeapObj("bar", 100))
if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 {
t.Fatalf("expected foo to be at the head")
}
}
// TestHeap_Get tests Heap.Get.
func TestHeap_Get(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
// Get works with the key.
item, exists := h.Get(mkHeapObj("baz", 0))
if !exists || item.val != 11 {
t.Fatalf("unexpected error in getting element")
}
// Get non-existing object.
_, exists = h.Get(mkHeapObj("non-existing", 0))
if exists {
t.Fatalf("didn't expect to get any object")
}
}
// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get.
func TestHeap_GetByKey(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("bal", 31))
h.AddOrUpdate(mkHeapObj("baz", 11))
item, exists := h.GetByKey("baz")
if !exists || item.val != 11 {
t.Fatalf("unexpected error in getting element")
}
// Get non-existing object.
_, exists = h.GetByKey("non-existing")
if exists {
t.Fatalf("didn't expect to get any object")
}
}
// TestHeap_List tests Heap.List function.
func TestHeap_List(t *testing.T) {
h := New(testHeapObjectKeyFunc, compareInts)
list := h.List()
if len(list) != 0 {
t.Errorf("expected an empty list")
}
items := map[string]int{
"foo": 10,
"bar": 1,
"bal": 30,
"baz": 11,
"faz": 30,
}
for k, v := range items {
h.AddOrUpdate(mkHeapObj(k, v))
}
list = h.List()
if len(list) != len(items) {
t.Errorf("expected %d items, got %d", len(items), len(list))
}
for _, heapObj := range list {
v, ok := items[heapObj.name]
if !ok || v != heapObj.val {
t.Errorf("unexpected item in the list: %v", heapObj)
}
}
}
func TestHeapWithRecorder(t *testing.T) {
metricRecorder := new(testMetricRecorder)
h := NewWithRecorder(testHeapObjectKeyFunc, compareInts, metricRecorder)
h.AddOrUpdate(mkHeapObj("foo", 10))
h.AddOrUpdate(mkHeapObj("bar", 1))
h.AddOrUpdate(mkHeapObj("baz", 100))
h.AddOrUpdate(mkHeapObj("qux", 11))
if *metricRecorder != 4 {
t.Errorf("expected count to be 4 but got %d", *metricRecorder)
}
if err := h.Delete(mkHeapObj("bar", 1)); err != nil {
t.Fatal(err)
}
if *metricRecorder != 3 {
t.Errorf("expected count to be 3 but got %d", *metricRecorder)
}
if _, err := h.Pop(); err != nil {
t.Fatal(err)
}
if *metricRecorder != 2 {
t.Errorf("expected count to be 2 but got %d", *metricRecorder)
}
h.metricRecorder.Clear()
if *metricRecorder != 0 {
t.Errorf("expected count to be 0 but got %d", *metricRecorder)
}
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/karmada-io/karmada/pkg/scheduler/internal/heap"
metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue"
)
// ActiveQueue defines the interface of activeQ related operations.
type ActiveQueue interface {
Push(bindingInfo *QueuedBindingInfo)
Pop() (*QueuedBindingInfo, bool)
Len() int
Done(bindingInfo *QueuedBindingInfo)
Has(key string) bool
ShutDown()
}
// NewActiveQueue builds a instance of ActiveQueue.
func NewActiveQueue(metricRecorder metrics.MetricRecorder) ActiveQueue {
q := &activequeue{
activeBindings: heap.NewWithRecorder[*QueuedBindingInfo](BindingKeyFunc, Less, metricRecorder),
dirtyBindings: sets.Set[string]{},
processingBindings: sets.Set[string]{},
cond: sync.NewCond(&sync.Mutex{}),
}
return q
}
// activequeue is a priority work queue, which implements a ActiveQueue.
type activequeue struct {
// activeBindings defines the order in which we will work on items. Every
// element of queue should be in the dirtyBindings set and not in the
// processing set.
activeBindings *heap.Heap[*QueuedBindingInfo]
// dirtyBindings defines all of the items that need to be processed.
dirtyBindings sets.Set[string]
// Things that are currently being processed are in the processingBindings set.
// These things may be simultaneously in the dirtyBindings set. When we finish
// processingBindings something and remove it from this set, we'll check if
// it's in the dirtyBindings set, and if so, add it to the queue.
processingBindings sets.Set[string]
cond *sync.Cond
shuttingDown bool
}
// Push marks item as needing processing.
func (q *activequeue) Push(bindingInfo *QueuedBindingInfo) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirtyBindings.Has(bindingInfo.NamespacedKey) {
return
}
now := time.Now()
bindingInfo.Timestamp = now
if bindingInfo.InitialAttemptTimestamp == nil {
bindingInfo.InitialAttemptTimestamp = &now
}
q.dirtyBindings.Insert(bindingInfo.NamespacedKey)
if q.processingBindings.Has(bindingInfo.NamespacedKey) {
return
}
q.activeBindings.AddOrUpdate(bindingInfo)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Push() or Pop() on Len() being a particular
// value, that can't be synchronized properly.
func (q *activequeue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.activeBindings.Len()
}
// Pop blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *activequeue) Pop() (bindingInfo *QueuedBindingInfo, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for q.activeBindings.Len() == 0 && !q.shuttingDown {
q.cond.Wait()
}
if q.activeBindings.Len() == 0 {
// We must be shutting down.
return nil, true
}
bindingInfo, _ = q.activeBindings.Pop()
bindingInfo.Attempts++
q.processingBindings.Insert(bindingInfo.NamespacedKey)
q.dirtyBindings.Delete(bindingInfo.NamespacedKey)
return bindingInfo, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *activequeue) Done(bindingInfo *QueuedBindingInfo) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.processingBindings.Delete(bindingInfo.NamespacedKey)
if q.dirtyBindings.Has(bindingInfo.NamespacedKey) {
bindingInfo.Timestamp = time.Now()
q.activeBindings.AddOrUpdate(bindingInfo)
q.cond.Signal()
} else if q.processingBindings.Len() == 0 {
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it and
// immediately instruct the worker goroutines to exit.
func (q *activequeue) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
// Has inform if bindingInfo exists in the queue.
func (q *activequeue) Has(key string) bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.dirtyBindings.Has(key)
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
)
func TestPriorityQueue(t *testing.T) {
tests := []struct {
name string
testFunc func() bool
}{
{
name: "Add b1, b2 and Get the binding with highest priority",
testFunc: func() bool {
b1 := &QueuedBindingInfo{NamespacedKey: "foo1", Priority: 2}
b2 := &QueuedBindingInfo{NamespacedKey: "foo2", Priority: 3}
queue := NewActiveQueue(nil)
queue.Push(b1)
queue.Push(b2)
item, shutdown := queue.Pop()
if shutdown {
return false
}
return item.NamespacedKey == b2.NamespacedKey
},
},
{
name: "Add b1, b2 and Get the binding which was enqueued first",
testFunc: func() bool {
b1 := &QueuedBindingInfo{NamespacedKey: "foo1"}
b2 := &QueuedBindingInfo{NamespacedKey: "foo2"}
queue := NewActiveQueue(nil)
queue.Push(b1)
queue.Push(b2)
item, shutdown := queue.Pop()
if shutdown {
return false
}
return item.NamespacedKey == b1.NamespacedKey
},
},
}
for _, tt := range tests {
success := tt.testFunc()
if !success {
t.Fatalf(`%q should be success, but get failed`, tt.name)
}
}
}

View File

@ -0,0 +1,385 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/scheduler/internal/heap"
metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue"
)
const (
// Scheduling queue names
activeQ = "Active"
backoffQ = "Backoff"
unschedulableBindings = "Unschedulable"
)
const (
// DefaultBindingMaxInUnschedulableBindingsDuration is the default value for the maximum
// time a binding can stay in unschedulableBindings. If a binding stays in unschedulableBindings
// for longer than this value, the binding will be moved from unschedulableBindings to
// backoffQ or activeQ. If this value is empty, the default value (5min)
// will be used.
DefaultBindingMaxInUnschedulableBindingsDuration = 5 * time.Minute
// DefaultBindingInitialBackoffDuration is the default value for the initial backoff duration
// for unschedulable bindings.
DefaultBindingInitialBackoffDuration = 1 * time.Second
// DefaultBindingMaxBackoffDuration is the default value for the max backoff duration
// for unschedulable bindings.
DefaultBindingMaxBackoffDuration = 10 * time.Second
)
// SchedulingQueue is an interface for a queue to store bindings waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
// Push pushes an new binding to activeQ.
Push(bindingInfo *QueuedBindingInfo)
// PushUnschedulableIfNotPresent pushes an unschedulable binding back to scheduling queue.
PushUnschedulableIfNotPresent(bindingInfo *QueuedBindingInfo)
// PushBackoffIfNotPresent pushes an failed binding back to scheduling queue.
PushBackoffIfNotPresent(bindingInfo *QueuedBindingInfo)
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new binding is added to the queue.
Pop() (*QueuedBindingInfo, bool)
// Done must be called for binding returned by Pop. This allows the queue to
// keep track of which bindings are currently being processed.
Done(bindingInfo *QueuedBindingInfo)
// Len returns the length of activeQ.
Len() int
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll remove it from backoffQ, but you still have to call `Done` on the queue.
Forget(bindingInfo *QueuedBindingInfo)
// Run starts the goroutines managing the queue.
Run()
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
}
type schedulingQueueOptions struct {
bindingInitialBackoffDuration time.Duration
bindingMaxBackoffDuration time.Duration
bindingMaxInUnschedulableBindingsDuration time.Duration
}
// Option configures a PriorityQueue
type Option func(*schedulingQueueOptions)
// WithBindingInitialBackoffDuration sets binding initial backoff duration for SchedulingQueue.
func WithBindingInitialBackoffDuration(duration time.Duration) Option {
return func(o *schedulingQueueOptions) {
o.bindingInitialBackoffDuration = duration
}
}
// WithBindingMaxBackoffDuration sets binding max backoff duration for SchedulingQueue.
func WithBindingMaxBackoffDuration(duration time.Duration) Option {
return func(o *schedulingQueueOptions) {
o.bindingMaxBackoffDuration = duration
}
}
// WithBindingMaxInUnschedulableBindingsDuration sets bindingMaxInUnschedulableBindingsDuration for SchedulingQueue.
func WithBindingMaxInUnschedulableBindingsDuration(duration time.Duration) Option {
return func(o *schedulingQueueOptions) {
o.bindingMaxInUnschedulableBindingsDuration = duration
}
}
var defaultSchedulingQueueOptions = schedulingQueueOptions{
bindingInitialBackoffDuration: DefaultBindingInitialBackoffDuration,
bindingMaxBackoffDuration: DefaultBindingMaxBackoffDuration,
bindingMaxInUnschedulableBindingsDuration: DefaultBindingMaxInUnschedulableBindingsDuration,
}
// NewSchedulingQueue builds a SchedulingQueue instance.
func NewSchedulingQueue(opts ...Option) SchedulingQueue {
options := defaultSchedulingQueueOptions
for _, opt := range opts {
opt(&options)
}
bq := &prioritySchedulingQueue{
stop: make(chan struct{}),
bindingInitialBackoffDuration: options.bindingInitialBackoffDuration,
bindingMaxBackoffDuration: options.bindingMaxBackoffDuration,
bindingMaxInUnschedulableBindingsDuration: options.bindingMaxInUnschedulableBindingsDuration,
activeQ: NewActiveQueue(metrics.NewActiveBindingsRecorder()),
backoffQ: heap.NewWithRecorder(BindingKeyFunc, Less, metrics.NewBackoffBindingsRecorder()),
unschedulableBindings: newUnschedulableBindings(metrics.NewUnschedulableBindingsRecorder()),
}
return bq
}
// prioritySchedulingQueue implements a SchedulingQueue.
// The head of PriorityQueue is the highest priority pending binding. This structure
// has two sub queues and a additional data structure, namely: activeQ,
// backoffQ and unschedulableBindings.
// - activeQ holds bindings that are being considered for scheduling.
// - backoffQ holds bindings that moved from unschedulableBindings and will move to
// activeQ when their backoff periods complete.
// - unschedulableBindings holds bindings that were already attempted for scheduling and
// are currently determined to be unschedulable.
type prioritySchedulingQueue struct {
// stopCh is used to stop the goroutine which periodically flushes pending bindings.
stop chan struct{}
// lock takes precedence and should be taken first,
// before any other locks in the queue.
lock sync.RWMutex
// binding initial backoff duration.
bindingInitialBackoffDuration time.Duration
// binding maximum backoff duration.
bindingMaxBackoffDuration time.Duration
// the maximum time a binding can stay in the unschedulableBindings.
bindingMaxInUnschedulableBindingsDuration time.Duration
// activeQ is a priority queue ordered by priority
activeQ ActiveQueue
// backoffQ is a heap ordered by backoff expiry. Bindings which have completed backoff
// are popped from this heap before the scheduler looks at activeQ.
backoffQ *heap.Heap[*QueuedBindingInfo]
// unschedulableBindings holds bindings that have been tried and determined unschedulable.
unschedulableBindings *UnschedulableBindings
}
// Run starts the goroutine to flush backoffQ and unschedulableBindings.
func (bq *prioritySchedulingQueue) Run() {
go wait.Until(bq.flushBackoffQCompleted, 1.0*time.Second, bq.stop)
go wait.Until(bq.flushUnschedulableBindingsLeftover, 30*time.Second, bq.stop)
}
// Close closes the scheduling queue.
func (bq *prioritySchedulingQueue) Close() {
bq.lock.Lock()
defer bq.lock.Unlock()
close(bq.stop)
bq.activeQ.ShutDown()
bq.unschedulableBindings.clear()
}
// flushBackoffQCompleted moves all bindings from backoffQ which have completed backoff in to activeQ
func (bq *prioritySchedulingQueue) flushBackoffQCompleted() {
bq.lock.Lock()
defer bq.lock.Unlock()
for {
bInfo, ok := bq.backoffQ.Peek()
if !ok || bInfo == nil {
break
}
if bq.isBindingBackingoff(bInfo) {
break
}
_, err := bq.backoffQ.Pop()
if err != nil {
klog.Error(err, "Unable to pop binding from backoff queue despite backoff completion", "binding", bInfo.NamespacedKey)
break
}
bq.moveToActiveQ(bInfo)
}
}
// isBindingBackingoff returns true if a binding is still waiting for its backoff timer.
// If this returns true, the binding should not be re-tried.
func (bq *prioritySchedulingQueue) isBindingBackingoff(bindingInfo *QueuedBindingInfo) bool {
boTime := bq.getBackoffTime(bindingInfo)
return boTime.After(time.Now())
}
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the binding has made.
func (bq *prioritySchedulingQueue) calculateBackoffDuration(bindingInfo *QueuedBindingInfo) time.Duration {
if bindingInfo.Attempts == 0 {
// When the Binding hasn't experienced any scheduling attempts,
// they aren't obliged to get a backoff penalty at all.
return 0
}
duration := bq.bindingInitialBackoffDuration
for i := 1; i < bindingInfo.Attempts; i++ {
// Use subtraction instead of addition or multiplication to avoid overflow.
if duration > bq.bindingMaxBackoffDuration-duration {
return bq.bindingMaxBackoffDuration
}
duration += duration
}
return duration
}
// getBackoffTime returns the time that bindingInfo completes backoff
func (bq *prioritySchedulingQueue) getBackoffTime(bindingInfo *QueuedBindingInfo) time.Time {
duration := bq.calculateBackoffDuration(bindingInfo)
backoffTime := bindingInfo.Timestamp.Add(duration)
return backoffTime
}
// flushUnschedulableBindingsLeftover moves bindings which stay in unschedulableBindings
// longer than bindingMaxInUnschedulableBindingsDuration to activeQ.
func (bq *prioritySchedulingQueue) flushUnschedulableBindingsLeftover() {
bq.lock.Lock()
defer bq.lock.Unlock()
var bindingsToMove []*QueuedBindingInfo
currentTime := time.Now()
for _, bInfo := range bq.unschedulableBindings.bindingInfoMap {
lastScheduleTime := bInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > bq.bindingMaxInUnschedulableBindingsDuration {
bindingsToMove = append(bindingsToMove, bInfo)
}
}
for _, bInfo := range bindingsToMove {
bq.moveToActiveQ(bInfo)
}
}
func (bq *prioritySchedulingQueue) Push(bindingInfo *QueuedBindingInfo) {
bq.lock.Lock()
defer bq.lock.Unlock()
bq.moveToActiveQ(bindingInfo)
}
// Pop removes the head of the active queue and returns it. It blocks if the
func (bq *prioritySchedulingQueue) Pop() (*QueuedBindingInfo, bool) {
// activeQ is empty and waits until a new item is added to the queue.
return bq.activeQ.Pop()
}
func (bq *prioritySchedulingQueue) PushUnschedulableIfNotPresent(bindingInfo *QueuedBindingInfo) {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.backoffQ.Has(bindingInfo) || bq.activeQ.Has(bindingInfo.NamespacedKey) {
return
}
bq.unschedulableBindings.addOrUpdate(bindingInfo)
klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", unschedulableBindings)
}
func (bq *prioritySchedulingQueue) PushBackoffIfNotPresent(bindingInfo *QueuedBindingInfo) {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.unschedulableBindings.get(bindingInfo.NamespacedKey) != nil || bq.activeQ.Has(bindingInfo.NamespacedKey) {
return
}
bq.backoffQ.AddOrUpdate(bindingInfo)
klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", backoffQ)
}
// Done must be called for binding returned by Pop. This allows the queue to
// keep track of which bindings are currently being processed.
func (bq *prioritySchedulingQueue) Done(bindingInfo *QueuedBindingInfo) {
bq.activeQ.Done(bindingInfo)
}
func (bq *prioritySchedulingQueue) Len() int {
return bq.activeQ.Len()
}
func (bq *prioritySchedulingQueue) Forget(bindingInfo *QueuedBindingInfo) {
bq.lock.Lock()
defer bq.lock.Unlock()
_ = bq.backoffQ.Delete(bindingInfo)
}
// moveToActiveQ tries to add binding to active queue and remove it from unschedulable and backoff queues.
func (bq *prioritySchedulingQueue) moveToActiveQ(bindingInfo *QueuedBindingInfo) {
bq.activeQ.Push(bindingInfo)
_ = bq.backoffQ.Delete(bindingInfo) // just ignore this not-found error
bq.unschedulableBindings.delete(bindingInfo.NamespacedKey)
klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", activeQ)
}
// UnschedulableBindings holds bindings that cannot be scheduled. This data structure
// is used to implement unschedulableBindings.
type UnschedulableBindings struct {
// bindingInfoMap is a map key by a binding's full-name and the value is a pointer to the QueuedBindingInfo.
bindingInfoMap map[string]*QueuedBindingInfo
// unschedulableRecorder updates the counter when elements of an bindingInfoMap
// get added or removed, and it does nothing if it's nil.
unschedulableRecorder metrics.MetricRecorder
}
// addOrUpdate adds a binding to the unschedulable bindingInfoMap.
func (u *UnschedulableBindings) addOrUpdate(bindingInfo *QueuedBindingInfo) {
if _, exists := u.bindingInfoMap[bindingInfo.NamespacedKey]; !exists {
if u.unschedulableRecorder != nil {
u.unschedulableRecorder.Inc()
}
}
u.bindingInfoMap[bindingInfo.NamespacedKey] = bindingInfo
}
// delete deletes a binding from the unschedulable bindingInfoMap.
func (u *UnschedulableBindings) delete(bindingKey string) {
if _, exists := u.bindingInfoMap[bindingKey]; exists {
if u.unschedulableRecorder != nil {
u.unschedulableRecorder.Dec()
}
}
delete(u.bindingInfoMap, bindingKey)
}
// get returns the QueuedBindingInfo if a binding with the same key as the key of the given "binding"
// is found in the map. It returns nil otherwise.
func (u *UnschedulableBindings) get(bindingKey string) *QueuedBindingInfo {
if bindingInfo, exists := u.bindingInfoMap[bindingKey]; exists {
return bindingInfo
}
return nil
}
// clear removes all the entries from the unschedulable bindingInfoMap.
func (u *UnschedulableBindings) clear() {
u.bindingInfoMap = make(map[string]*QueuedBindingInfo)
if u.unschedulableRecorder != nil {
u.unschedulableRecorder.Clear()
}
}
// newUnschedulableBindings initializes a new object of UnschedulableBindings.
func newUnschedulableBindings(unschedulableRecorder metrics.MetricRecorder) *UnschedulableBindings {
return &UnschedulableBindings{
bindingInfoMap: make(map[string]*QueuedBindingInfo),
unschedulableRecorder: unschedulableRecorder,
}
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"time"
)
// QueuedBindingInfo is a Binding wrapper with additional information related to
// the binding's status in the scheduling queue, such as the timestamp when
// it's added to the queue.
type QueuedBindingInfo struct {
NamespacedKey string
// The priority of ResourceBinding.
Priority int32
// The time binding added to the scheduling queue.
Timestamp time.Time
// The time when the binding is added to the queue for the first time. The binding may be added
// back to the queue multiple times before it's successfully scheduled.
// It shouldn't be updated once initialized.
InitialAttemptTimestamp *time.Time
// Number of schedule attempts before successfully scheduled.
// It's used to record the attempts metric and calculate the backoff time this Binding is obliged to get before retrying.
Attempts int
}
// DeepCopy returns a deep copy of the QueuedBindingInfo object.
func (qbi *QueuedBindingInfo) DeepCopy() *QueuedBindingInfo {
return &QueuedBindingInfo{
NamespacedKey: qbi.NamespacedKey,
Priority: qbi.Priority,
Timestamp: qbi.Timestamp,
Attempts: qbi.Attempts,
InitialAttemptTimestamp: qbi.InitialAttemptTimestamp,
}
}
// BindingKeyFunc is the key mapping function of QueuedBindingInfo.
func BindingKeyFunc(bindingInfo *QueuedBindingInfo) string {
return bindingInfo.NamespacedKey
}
// Less is the function used by the activeQ heap algorithm to sort bindings.
// It sorts bindings based on their priority. When priorities are equal, it uses
// QueuedBindingInfo.timestamp.
func Less(bInfo1, bInfo2 *QueuedBindingInfo) bool {
p1 := bInfo1.Priority
p2 := bInfo2.Priority
return (p1 > p2) || (p1 == p2 && bInfo1.Timestamp.Before(bInfo2.Timestamp))
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"sync/atomic"
"testing"
)
var _ MetricRecorder = &fakeBindingsRecorder{}
type fakeBindingsRecorder struct {
counter int64
}
func (r *fakeBindingsRecorder) Inc() {
atomic.AddInt64(&r.counter, 1)
}
func (r *fakeBindingsRecorder) Dec() {
atomic.AddInt64(&r.counter, -1)
}
func (r *fakeBindingsRecorder) Clear() {
atomic.StoreInt64(&r.counter, 0)
}
func TestInc(t *testing.T) {
fakeRecorder := fakeBindingsRecorder{}
var wg sync.WaitGroup
loops := 100
wg.Add(loops)
for i := 0; i < loops; i++ {
go func() {
fakeRecorder.Inc()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(loops) {
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
}
}
func TestDec(t *testing.T) {
fakeRecorder := fakeBindingsRecorder{counter: 100}
var wg sync.WaitGroup
loops := 100
wg.Add(loops)
for i := 0; i < loops; i++ {
go func() {
fakeRecorder.Dec()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(0) {
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
}
}
func TestClear(t *testing.T) {
fakeRecorder := fakeBindingsRecorder{}
var wg sync.WaitGroup
incLoops, decLoops := 100, 80
wg.Add(incLoops + decLoops)
for i := 0; i < incLoops; i++ {
go func() {
fakeRecorder.Inc()
wg.Done()
}()
}
for i := 0; i < decLoops; i++ {
go func() {
fakeRecorder.Dec()
wg.Done()
}()
}
wg.Wait()
if fakeRecorder.counter != int64(incLoops-decLoops) {
t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter)
}
// verify Clear() works
fakeRecorder.Clear()
if fakeRecorder.counter != int64(0) {
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
}
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2025 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
// SchedulerSubsystem - subsystem name used by scheduler.
SchedulerSubsystem = "scheduler"
)
// All the histogram based metrics have 1ms as size for the smallest bucket.
var (
pendingBindings = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "pending_bindings",
Help: "Number of pending bindings, by the queue type. 'active' means number of bindings in activeQ; 'backoff' means number of bindings in backoffQ; 'unschedulable' means number of bindings in unschedulableBindings that the scheduler attempted to schedule and failed.",
StabilityLevel: metrics.ALPHA,
}, []string{"queue"})
metricsList = []metrics.Registerable{
pendingBindings,
}
)
func init() {
// Register the metrics.
RegisterMetrics(metricsList...)
}
// RegisterMetrics registers a list of metrics.
// This function is exported because it is intended to be used by out-of-tree plugins to register their custom metrics.
func RegisterMetrics(extraMetrics ...metrics.Registerable) {
for _, metric := range extraMetrics {
legacyregistry.MustRegister(metric)
}
}
// ActiveBindings returns the pending bindings metrics with the label active
func ActiveBindings() metrics.GaugeMetric {
return pendingBindings.With(metrics.Labels{"queue": "active"})
}
// BackoffBindings returns the pending bindings metrics with the label backoff
func BackoffBindings() metrics.GaugeMetric {
return pendingBindings.With(metrics.Labels{"queue": "backoff"})
}
// UnschedulableBindings returns the pending bindings metrics with the label unschedulable
func UnschedulableBindings() metrics.GaugeMetric {
return pendingBindings.With(metrics.Labels{"queue": "unschedulable"})
}
// MetricRecorder represents a metric recorder which takes action when the
// metric Inc(), Dec() and Clear()
type MetricRecorder interface {
Inc()
Dec()
Clear()
}
var _ MetricRecorder = &PendingBindingsRecorder{}
// PendingBindingsRecorder is an implementation of MetricRecorder
type PendingBindingsRecorder struct {
recorder metrics.GaugeMetric
}
// NewActiveBindingsRecorder returns ActiveBindings in a Prometheus metric fashion
func NewActiveBindingsRecorder() *PendingBindingsRecorder {
return &PendingBindingsRecorder{
recorder: ActiveBindings(),
}
}
// NewUnschedulableBindingsRecorder returns UnschedulableBindings in a Prometheus metric fashion
func NewUnschedulableBindingsRecorder() *PendingBindingsRecorder {
return &PendingBindingsRecorder{
recorder: UnschedulableBindings(),
}
}
// NewBackoffBindingsRecorder returns BackoffBindings in a Prometheus metric fashion
func NewBackoffBindingsRecorder() *PendingBindingsRecorder {
return &PendingBindingsRecorder{
recorder: BackoffBindings(),
}
}
// Inc increases a metric counter by 1, in an atomic way
func (r *PendingBindingsRecorder) Inc() {
r.recorder.Inc()
}
// Dec decreases a metric counter by 1, in an atomic way
func (r *PendingBindingsRecorder) Dec() {
r.recorder.Dec()
}
// Clear set a metric counter to 0, in an atomic way
func (r *PendingBindingsRecorder) Clear() {
r.recorder.Set(float64(0))
}

View File

@ -44,6 +44,7 @@ import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/features"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
@ -53,6 +54,7 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/framework"
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
@ -95,9 +97,12 @@ type Scheduler struct {
// clusterReconcileWorker reconciles cluster changes to trigger corresponding
// ResourceBinding/ClusterResourceBinding rescheduling.
clusterReconcileWorker util.AsyncWorker
// TODO: implement a priority scheduling queue
// queue is the legacy rate limiting queue which will be replaced by priorityQueue
// in the future releases.
queue workqueue.TypedRateLimitingInterface[any]
priorityQueue internalqueue.SchedulingQueue
Algorithm core.ScheduleAlgorithm
schedulerCache schedulercache.Cache
@ -239,7 +244,13 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
for _, opt := range opts {
opt(&options)
}
queue := workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"})
var legacyQueue workqueue.TypedRateLimitingInterface[any]
var priorityQueue internalqueue.SchedulingQueue
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
priorityQueue = internalqueue.NewSchedulingQueue()
} else {
legacyQueue = workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"})
}
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.outOfTreeRegistry); err != nil {
return nil, err
@ -258,7 +269,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
clusterBindingLister: clusterBindingLister,
clusterLister: clusterLister,
informerFactory: factory,
queue: queue,
queue: legacyQueue,
priorityQueue: priorityQueue,
Algorithm: algorithm,
schedulerCache: schedulerCache,
}
@ -310,9 +322,15 @@ func (s *Scheduler) Run(ctx context.Context) {
go wait.Until(s.worker, time.Second, stopCh)
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
s.priorityQueue.Run()
<-stopCh
s.priorityQueue.Close()
} else {
<-stopCh
s.queue.ShutDown()
}
}
func (s *Scheduler) worker() {
for s.scheduleNext() {
@ -320,6 +338,17 @@ func (s *Scheduler) worker() {
}
func (s *Scheduler) scheduleNext() bool {
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
bindingInfo, shutdown := s.priorityQueue.Pop()
if shutdown {
klog.Errorf("Fail to pop item from priorityQueue")
return false
}
defer s.priorityQueue.Done(bindingInfo)
err := s.doSchedule(bindingInfo.NamespacedKey)
s.handleErr(err, bindingInfo)
} else {
key, shutdown := s.queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
@ -328,7 +357,8 @@ func (s *Scheduler) scheduleNext() bool {
defer s.queue.Done(key)
err := s.doSchedule(key.(string))
s.handleErr(err, key)
s.legacyHandleErr(err, key)
}
return true
}
@ -759,12 +789,26 @@ func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *wor
return nil
}
func (s *Scheduler) handleErr(err error, key interface{}) {
func (s *Scheduler) handleErr(err error, bindingInfo *internalqueue.QueuedBindingInfo) {
if err == nil || apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
s.priorityQueue.Forget(bindingInfo)
return
}
var unschedulableErr *framework.UnschedulableError
if !errors.As(err, &unschedulableErr) {
s.priorityQueue.PushUnschedulableIfNotPresent(bindingInfo)
} else {
s.priorityQueue.PushBackoffIfNotPresent(bindingInfo)
}
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}
func (s *Scheduler) legacyHandleErr(err error, key interface{}) {
if err == nil || apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
s.queue.Forget(key)
return
}
s.queue.AddRateLimited(key)
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}

View File

@ -25,10 +25,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics/testutil"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
)
@ -69,10 +72,18 @@ var (
scheduler.onClusterResourceBindingRequeue(crb, metrics.ClusterChanged)
}
scheduleAttemptSuccess = func(scheduler *Scheduler, obj interface{}) {
scheduler.handleErr(nil, obj)
rb := obj.(*workv1alpha2.ResourceBinding)
scheduler.handleErr(nil, &internalqueue.QueuedBindingInfo{
NamespacedKey: cache.ObjectName{Namespace: rb.Namespace, Name: rb.Namespace}.String(),
Priority: rb.Spec.SchedulePriorityValue(),
})
}
scheduleAttemptFailure = func(scheduler *Scheduler, obj interface{}) {
scheduler.handleErr(fmt.Errorf("schedule attempt failure"), obj)
rb := obj.(*workv1alpha2.ResourceBinding)
scheduler.handleErr(fmt.Errorf("schedule attempt failure"), &internalqueue.QueuedBindingInfo{
NamespacedKey: cache.ObjectName{Namespace: rb.Namespace, Name: rb.Namespace}.String(),
Priority: rb.Spec.SchedulePriorityValue(),
})
}
)
@ -81,6 +92,8 @@ func TestIncomingBindingMetrics(t *testing.T) {
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
// enable "PriorityBasedScheduling" feature gate.
_ = features.FeatureGate.Set("PriorityBasedScheduling=true")
sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient)
if err != nil {
t.Errorf("create scheduler error: %s", err)

View File

@ -35,14 +35,15 @@ import (
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake"
workv1alpha2lister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/core"
schedulercore "github.com/karmada-io/karmada/pkg/scheduler/core"
internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/grpcconnection"
@ -1065,26 +1066,31 @@ func TestWorkerAndScheduleNext(t *testing.T) {
testCases := []struct {
name string
key string
priority int32
shutdown bool
expectResult bool
}{
{
name: "Schedule ResourceBinding",
key: "default/test-binding",
priority: 10,
shutdown: false,
expectResult: true,
},
{
name: "Schedule ClusterResourceBinding",
key: "test-cluster-binding",
priority: 5,
shutdown: false,
expectResult: true,
},
}
// enable "PriorityBasedScheduling" feature gate.
_ = features.FeatureGate.Set("PriorityBasedScheduling=true")
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]())
queue := internalqueue.NewSchedulingQueue()
bindingLister := &fakeBindingLister{binding: resourceBinding}
clusterBindingLister := &fakeClusterBindingLister{binding: clusterResourceBinding}
@ -1103,17 +1109,20 @@ func TestWorkerAndScheduleNext(t *testing.T) {
s := &Scheduler{
KarmadaClient: fakeClient,
queue: queue,
priorityQueue: queue,
bindingLister: bindingLister,
clusterBindingLister: clusterBindingLister,
Algorithm: mockAlgo,
eventRecorder: eventRecorder,
}
s.queue.Add(tc.key)
s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{
NamespacedKey: tc.key,
Priority: tc.priority,
})
if tc.shutdown {
s.queue.ShutDown()
s.priorityQueue.Close()
}
result := s.scheduleNext()
@ -1121,7 +1130,7 @@ func TestWorkerAndScheduleNext(t *testing.T) {
assert.Equal(t, tc.expectResult, result, "scheduleNext return value mismatch")
if !tc.shutdown {
assert.Equal(t, 0, s.queue.Len(), "Queue should be empty after processing")
assert.Equal(t, 0, s.priorityQueue.Len(), "Queue should be empty after processing")
}
})
}
@ -1224,6 +1233,7 @@ func TestCreateScheduler(t *testing.T) {
enableEmptyWorkloadPropagation bool
plugins []string
rateLimiterOptions ratelimiterflag.Options
enablePriorityBasedScheduling bool
}{
{
name: "scheduler with default configuration",
@ -1313,11 +1323,11 @@ func TestCreateScheduler(t *testing.T) {
plugins: mockPlugins,
},
{
name: "scheduler with RateLimiterOptions",
name: "scheduler with PriorityBasedScheduling enabled",
opts: []Option{
WithRateLimiterOptions(mockRateLimiterOptions),
},
rateLimiterOptions: mockRateLimiterOptions,
enablePriorityBasedScheduling: true,
},
}
@ -1358,8 +1368,8 @@ func TestCreateScheduler(t *testing.T) {
if len(tc.plugins) > 0 && sche.Algorithm == nil {
t.Errorf("expected Algorithm to be set when plugins are provided")
}
if tc.rateLimiterOptions != (ratelimiterflag.Options{}) && sche.queue == nil {
t.Errorf("expected queue to be set when rate limiter options are provided")
if tc.enablePriorityBasedScheduling && sche.priorityQueue == nil {
t.Errorf("expected priorityQueue to be set when feature gate %q is enabled", features.PriorityBasedScheduling)
}
})
}