diff --git a/pkg/apis/work/v1alpha2/binding_types_helper.go b/pkg/apis/work/v1alpha2/binding_types_helper.go index e89723c20..dc764840f 100644 --- a/pkg/apis/work/v1alpha2/binding_types_helper.go +++ b/pkg/apis/work/v1alpha2/binding_types_helper.go @@ -204,3 +204,12 @@ func (s *ResourceBindingSpec) SchedulingSuspended() bool { return *s.Suspension.Scheduling } + +// SchedulePriorityValue returns the scheduling priority declared +// by '.spec.SchedulePriority.Priority'. +func (s *ResourceBindingSpec) SchedulePriorityValue() int32 { + if s.SchedulePriority == nil { + return 0 + } + return s.SchedulePriority.Priority +} diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index 5abddb1f7..efccc0bbe 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -33,6 +33,8 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/features" + internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" @@ -117,14 +119,41 @@ func (s *Scheduler) resourceBindingEventFilter(obj interface{}) bool { util.GetLabelValue(accessor.GetLabels(), workv1alpha2.BindingManagedByLabel) != "" } -func (s *Scheduler) onResourceBindingAdd(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - klog.Errorf("couldn't get key for object %#v: %v", obj, err) - return +func newQueuedBindingInfo(obj interface{}) *internalqueue.QueuedBindingInfo { + switch t := obj.(type) { + case *workv1alpha2.ResourceBinding: + return &internalqueue.QueuedBindingInfo{ + NamespacedKey: cache.ObjectName{Namespace: t.GetNamespace(), Name: t.GetName()}.String(), + Priority: t.Spec.SchedulePriorityValue(), + } + case *workv1alpha2.ClusterResourceBinding: + return &internalqueue.QueuedBindingInfo{ + NamespacedKey: t.GetName(), + Priority: t.Spec.SchedulePriorityValue(), + } } - s.queue.Add(key) + return nil +} + +func (s *Scheduler) onResourceBindingAdd(obj interface{}) { + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + bindingInfo := newQueuedBindingInfo(obj) + if bindingInfo == nil { + // shouldn't happen + klog.Errorf("couldn't convert to QueuedBindingInfo %#v", obj) + return + } + + s.priorityQueue.Push(bindingInfo) + } else { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + klog.Errorf("couldn't get key for object %#v: %v", obj, err) + return + } + s.queue.Add(key) + } metrics.CountSchedulerBindings(metrics.BindingAdd) } @@ -150,35 +179,62 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) { return } - key, err := cache.MetaNamespaceKeyFunc(cur) - if err != nil { - klog.Errorf("couldn't get key for object %#v: %v", cur, err) - return - } + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + bindingInfo := newQueuedBindingInfo(cur) + if bindingInfo == nil { + // shouldn't happen + klog.Errorf("couldn't convert to QueuedBindingInfo %#v", cur) + return + } - s.queue.Add(key) + s.priorityQueue.Push(bindingInfo) + } else { + key, err := cache.MetaNamespaceKeyFunc(cur) + if err != nil { + klog.Errorf("couldn't get key for object %#v: %v", cur, err) + return + } + + s.queue.Add(key) + } metrics.CountSchedulerBindings(metrics.BindingUpdate) } func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) { - key, err := cache.MetaNamespaceKeyFunc(binding) - if err != nil { - klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err) - return - } klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event) - s.queue.Add(key) + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{ + NamespacedKey: cache.ObjectName{Namespace: binding.Namespace, Name: binding.Name}.String(), + Priority: binding.Spec.SchedulePriorityValue(), + }) + } else { + key, err := cache.MetaNamespaceKeyFunc(binding) + if err != nil { + klog.Errorf("couldn't get key for ResourceBinding(%s/%s): %v", binding.Namespace, binding.Name, err) + return + } + klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event) + s.queue.Add(key) + } metrics.CountSchedulerBindings(event) } func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) { - key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding) - if err != nil { - klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err) - return - } klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event) - s.queue.Add(key) + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{ + NamespacedKey: clusterResourceBinding.Name, + Priority: clusterResourceBinding.Spec.SchedulePriorityValue(), + }) + } else { + key, err := cache.MetaNamespaceKeyFunc(clusterResourceBinding) + if err != nil { + klog.Errorf("couldn't get key for ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err) + return + } + klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event) + s.queue.Add(key) + } metrics.CountSchedulerBindings(event) } diff --git a/pkg/scheduler/internal/heap/heap.go b/pkg/scheduler/internal/heap/heap.go new file mode 100755 index 000000000..86b38da19 --- /dev/null +++ b/pkg/scheduler/internal/heap/heap.go @@ -0,0 +1,249 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Below is the implementation of the a heap. The logic is pretty much the same +// as cache.heap, however, this heap does not perform synchronization. It leaves +// synchronization to the SchedulingQueue. + +// This code is basically lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.32/pkg/scheduler/backend/heap/heap.go + +package heap + +import ( + "container/heap" + "fmt" + + metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue" +) + +// KeyFunc is a function type to get the key from an object. +type KeyFunc[T any] func(obj T) string + +type heapItem[T any] struct { + obj T // The object which is stored in the heap. + index int // The index of the object's key in the Heap.queue. +} + +type itemKeyValue[T any] struct { + key string + obj T +} + +// data is an internal struct that implements the standard heap interface +// and keeps the data stored in the heap. +type data[T any] struct { + // items is a map from key of the objects to the objects and their index. + // We depend on the property that items in the map are in the queue and vice versa. + items map[string]*heapItem[T] + // queue implements a heap data structure and keeps the order of elements + // according to the heap invariant. The queue keeps the keys of objects stored + // in "items". + queue []string + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc[T] + // lessFunc is used to compare two objects in the heap. + lessFunc LessFunc[T] +} + +var ( + _ = heap.Interface(&data[any]{}) // heapData is a standard heap +) + +// Less compares two objects and returns true if the first one should go +// in front of the second one in the heap. +func (h *data[T]) Less(i, j int) bool { + if i > len(h.queue) || j > len(h.queue) { + return false + } + itemi, ok := h.items[h.queue[i]] + if !ok { + return false + } + itemj, ok := h.items[h.queue[j]] + if !ok { + return false + } + return h.lessFunc(itemi.obj, itemj.obj) +} + +// Len returns the number of items in the Heap. +func (h *data[T]) Len() int { return len(h.queue) } + +// Swap implements swapping of two elements in the heap. This is a part of standard +// heap interface and should never be called directly. +func (h *data[T]) Swap(i, j int) { + if i < 0 || j < 0 { + return + } + h.queue[i], h.queue[j] = h.queue[j], h.queue[i] + item := h.items[h.queue[i]] + item.index = i + item = h.items[h.queue[j]] + item.index = j +} + +// Push is supposed to be called by container/heap.Push only. +func (h *data[T]) Push(kv interface{}) { + keyValue := kv.(*itemKeyValue[T]) + n := len(h.queue) + h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n} + h.queue = append(h.queue, keyValue.key) +} + +// Pop is supposed to be called by container/heap.Pop only. +func (h *data[T]) Pop() interface{} { + if len(h.queue) == 0 { + return nil + } + key := h.queue[len(h.queue)-1] + h.queue = h.queue[0 : len(h.queue)-1] + item, ok := h.items[key] + if !ok { + // This is an error + return nil + } + delete(h.items, key) + return item.obj +} + +// Peek returns the head of the heap without removing it. +func (h *data[T]) Peek() (T, bool) { + if len(h.queue) > 0 { + return h.items[h.queue[0]].obj, true + } + var zero T + return zero, false +} + +// Heap is a producer/consumer queue that implements a heap data structure. +// It can be used to implement priority queues and similar data structures. +type Heap[T any] struct { + // data stores objects and has a queue that keeps their ordering according + // to the heap invariant. + data *data[T] + // metricRecorder updates the counter when elements of a heap get added or + // removed, and it does nothing if it's nil + metricRecorder metrics.MetricRecorder +} + +// AddOrUpdate inserts an item, and puts it in the queue. The item is updated if it +// already exists. +func (h *Heap[T]) AddOrUpdate(obj T) { + key := h.data.keyFunc(obj) + if _, exists := h.data.items[key]; exists { + h.data.items[key].obj = obj + heap.Fix(h.data, h.data.items[key].index) + } else { + heap.Push(h.data, &itemKeyValue[T]{key, obj}) + if h.metricRecorder != nil { + h.metricRecorder.Inc() + } + } +} + +// Delete removes an item. +func (h *Heap[T]) Delete(obj T) error { + key := h.data.keyFunc(obj) + if item, ok := h.data.items[key]; ok { + heap.Remove(h.data, item.index) + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } + return nil + } + return fmt.Errorf("object not found") +} + +// Peek returns the head of the heap without removing it. +func (h *Heap[T]) Peek() (T, bool) { + return h.data.Peek() +} + +// Pop returns the head of the heap and removes it. +func (h *Heap[T]) Pop() (T, error) { + obj := heap.Pop(h.data) + if obj != nil { + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } + return obj.(T), nil + } + var zero T + return zero, fmt.Errorf("heap is empty") +} + +// Get returns the requested item, or sets exists=false. +func (h *Heap[T]) Get(obj T) (T, bool) { + key := h.data.keyFunc(obj) + return h.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +func (h *Heap[T]) GetByKey(key string) (T, bool) { + item, exists := h.data.items[key] + if !exists { + var zero T + return zero, false + } + return item.obj, true +} + +// Has checks if obj exists. +func (h *Heap[T]) Has(obj T) bool { + key := h.data.keyFunc(obj) + _, ok := h.GetByKey(key) + return ok +} + +// List returns a list of all the items. +func (h *Heap[T]) List() []T { + list := make([]T, 0, len(h.data.items)) + for _, item := range h.data.items { + list = append(list, item.obj) + } + return list +} + +// Len returns the number of items in the heap. +func (h *Heap[T]) Len() int { + return len(h.data.queue) +} + +// New returns a Heap which can be used to queue up items to process. +func New[T any](keyFn KeyFunc[T], lessFn LessFunc[T]) *Heap[T] { + return NewWithRecorder(keyFn, lessFn, nil) +} + +// NewWithRecorder wraps an optional metricRecorder to compose a Heap object. +func NewWithRecorder[T any](keyFn KeyFunc[T], lessFn LessFunc[T], metricRecorder metrics.MetricRecorder) *Heap[T] { + return &Heap[T]{ + data: &data[T]{ + items: map[string]*heapItem[T]{}, + queue: []string{}, + keyFunc: keyFn, + lessFunc: lessFn, + }, + metricRecorder: metricRecorder, + } +} + +// LessFunc is a function that receives two items and returns true if the first +// item should be placed before the second one when the list is sorted. +type LessFunc[T any] func(item1, item2 T) bool diff --git a/pkg/scheduler/internal/heap/heap_test.go b/pkg/scheduler/internal/heap/heap_test.go new file mode 100755 index 000000000..66f4a89f6 --- /dev/null +++ b/pkg/scheduler/internal/heap/heap_test.go @@ -0,0 +1,307 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was copied from client-go/tools/cache/heap.go and modified +// for our non thread-safe heap + +// This code is basically lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.32/pkg/scheduler/backend/heap/heap_test.go + +package heap + +import ( + "testing" +) + +func testHeapObjectKeyFunc(obj testHeapObject) string { + return obj.name +} + +type testHeapObject struct { + name string + val interface{} +} + +type testMetricRecorder int + +func (tmr *testMetricRecorder) Inc() { + if tmr != nil { + *tmr++ + } +} + +func (tmr *testMetricRecorder) Dec() { + if tmr != nil { + *tmr-- + } +} + +func (tmr *testMetricRecorder) Clear() { + if tmr != nil { + *tmr = 0 + } +} + +func mkHeapObj(name string, val interface{}) testHeapObject { + return testHeapObject{name: name, val: val} +} + +func compareInts(val1 testHeapObject, val2 testHeapObject) bool { + first := val1.val.(int) + second := val2.val.(int) + return first < second +} + +// TestHeapBasic tests Heap invariant +func TestHeapBasic(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + const amount = 500 + var i int + var zero testHeapObject + + // empty queue + if item, ok := h.Peek(); ok || item != zero { + t.Errorf("expected nil object but got %v", item) + } + + for i = amount; i > 0; i-- { + h.AddOrUpdate(mkHeapObj(string([]rune{'a', rune(i)}), i)) + // Retrieve head without removing it + head, ok := h.Peek() + if e, a := i, head.val; !ok || a != e { + t.Errorf("expected %d, got %d", e, a) + } + } + + // Make sure that the numbers are popped in ascending order. + prevNum := 0 + for i := 0; i < amount; i++ { + item, err := h.Pop() + num := item.val.(int) + // All the items must be sorted. + if err != nil || prevNum > num { + t.Errorf("got %v out of order, last was %v", item, prevNum) + } + prevNum = num + } + + _, err := h.Pop() + if err == nil { + t.Errorf("expected Pop() to error on empty heap") + } +} + +// TestHeap_AddOrUpdate_Add tests add capabilities of Heap.AddOrUpdate +// and ensures that heap invariant is preserved after adding items. +func TestHeap_AddOrUpdate_Add(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + h.AddOrUpdate(mkHeapObj("zab", 30)) + h.AddOrUpdate(mkHeapObj("foo", 13)) // This updates "foo". + + item, err := h.Pop() + if e, a := 1, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 11, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if err := h.Delete(mkHeapObj("baz", 11)); err == nil { // Nothing is deleted. + t.Fatalf("nothing should be deleted from the heap") + } + h.AddOrUpdate(mkHeapObj("foo", 14)) // foo is updated. + item, err = h.Pop() + if e, a := 14, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is +// preserved after deleting items. +func TestHeap_Delete(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + + // Delete head. Delete should work with "key" and doesn't care about the value. + if err := h.Delete(mkHeapObj("bar", 200)); err != nil { + t.Fatalf("Failed to delete head.") + } + item, err := h.Pop() + if e, a := 10, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + h.AddOrUpdate(mkHeapObj("zab", 30)) + h.AddOrUpdate(mkHeapObj("faz", 30)) + curLen := h.data.Len() + // Delete non-existing item. + if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || curLen != h.data.Len() { + t.Fatalf("Didn't expect any item removal") + } + // Delete tail. + if err = h.Delete(mkHeapObj("bal", 31)); err != nil { + t.Fatalf("Failed to delete tail.") + } + // Delete one of the items with value 30. + if err = h.Delete(mkHeapObj("zab", 30)); err != nil { + t.Fatalf("Failed to delete item.") + } + item, err = h.Pop() + if e, a := 11, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if h.data.Len() != 0 { + t.Fatalf("expected an empty heap.") + } +} + +// TestHeap_AddOrUpdate_Update tests update capabilities of Heap.Update +// and ensures that heap invariant is preserved after adding items. +func TestHeap_AddOrUpdate_Update(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + + // Update an item to a value that should push it to the head. + h.AddOrUpdate(mkHeapObj("baz", 0)) + if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 { + t.Fatalf("expected baz to be at the head") + } + item, err := h.Pop() + if e, a := 0, item.val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + // Update bar to push it farther back in the queue. + h.AddOrUpdate(mkHeapObj("bar", 100)) + if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 { + t.Fatalf("expected foo to be at the head") + } +} + +// TestHeap_Get tests Heap.Get. +func TestHeap_Get(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + + // Get works with the key. + item, exists := h.Get(mkHeapObj("baz", 0)) + if !exists || item.val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists = h.Get(mkHeapObj("non-existing", 0)) + if exists { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. +func TestHeap_GetByKey(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("bal", 31)) + h.AddOrUpdate(mkHeapObj("baz", 11)) + + item, exists := h.GetByKey("baz") + if !exists || item.val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists = h.GetByKey("non-existing") + if exists { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_List tests Heap.List function. +func TestHeap_List(t *testing.T) { + h := New(testHeapObjectKeyFunc, compareInts) + list := h.List() + if len(list) != 0 { + t.Errorf("expected an empty list") + } + + items := map[string]int{ + "foo": 10, + "bar": 1, + "bal": 30, + "baz": 11, + "faz": 30, + } + for k, v := range items { + h.AddOrUpdate(mkHeapObj(k, v)) + } + list = h.List() + if len(list) != len(items) { + t.Errorf("expected %d items, got %d", len(items), len(list)) + } + for _, heapObj := range list { + v, ok := items[heapObj.name] + if !ok || v != heapObj.val { + t.Errorf("unexpected item in the list: %v", heapObj) + } + } +} + +func TestHeapWithRecorder(t *testing.T) { + metricRecorder := new(testMetricRecorder) + h := NewWithRecorder(testHeapObjectKeyFunc, compareInts, metricRecorder) + h.AddOrUpdate(mkHeapObj("foo", 10)) + h.AddOrUpdate(mkHeapObj("bar", 1)) + h.AddOrUpdate(mkHeapObj("baz", 100)) + h.AddOrUpdate(mkHeapObj("qux", 11)) + + if *metricRecorder != 4 { + t.Errorf("expected count to be 4 but got %d", *metricRecorder) + } + if err := h.Delete(mkHeapObj("bar", 1)); err != nil { + t.Fatal(err) + } + if *metricRecorder != 3 { + t.Errorf("expected count to be 3 but got %d", *metricRecorder) + } + if _, err := h.Pop(); err != nil { + t.Fatal(err) + } + if *metricRecorder != 2 { + t.Errorf("expected count to be 2 but got %d", *metricRecorder) + } + + h.metricRecorder.Clear() + if *metricRecorder != 0 { + t.Errorf("expected count to be 0 but got %d", *metricRecorder) + } +} diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go new file mode 100644 index 000000000..a89d865bc --- /dev/null +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -0,0 +1,160 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/karmada-io/karmada/pkg/scheduler/internal/heap" + metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue" +) + +// ActiveQueue defines the interface of activeQ related operations. +type ActiveQueue interface { + Push(bindingInfo *QueuedBindingInfo) + Pop() (*QueuedBindingInfo, bool) + Len() int + Done(bindingInfo *QueuedBindingInfo) + Has(key string) bool + ShutDown() +} + +// NewActiveQueue builds a instance of ActiveQueue. +func NewActiveQueue(metricRecorder metrics.MetricRecorder) ActiveQueue { + q := &activequeue{ + activeBindings: heap.NewWithRecorder[*QueuedBindingInfo](BindingKeyFunc, Less, metricRecorder), + dirtyBindings: sets.Set[string]{}, + processingBindings: sets.Set[string]{}, + cond: sync.NewCond(&sync.Mutex{}), + } + + return q +} + +// activequeue is a priority work queue, which implements a ActiveQueue. +type activequeue struct { + // activeBindings defines the order in which we will work on items. Every + // element of queue should be in the dirtyBindings set and not in the + // processing set. + activeBindings *heap.Heap[*QueuedBindingInfo] + + // dirtyBindings defines all of the items that need to be processed. + dirtyBindings sets.Set[string] + + // Things that are currently being processed are in the processingBindings set. + // These things may be simultaneously in the dirtyBindings set. When we finish + // processingBindings something and remove it from this set, we'll check if + // it's in the dirtyBindings set, and if so, add it to the queue. + processingBindings sets.Set[string] + + cond *sync.Cond + + shuttingDown bool +} + +// Push marks item as needing processing. +func (q *activequeue) Push(bindingInfo *QueuedBindingInfo) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + if q.dirtyBindings.Has(bindingInfo.NamespacedKey) { + return + } + + now := time.Now() + bindingInfo.Timestamp = now + if bindingInfo.InitialAttemptTimestamp == nil { + bindingInfo.InitialAttemptTimestamp = &now + } + q.dirtyBindings.Insert(bindingInfo.NamespacedKey) + if q.processingBindings.Has(bindingInfo.NamespacedKey) { + return + } + + q.activeBindings.AddOrUpdate(bindingInfo) + q.cond.Signal() +} + +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Push() or Pop() on Len() being a particular +// value, that can't be synchronized properly. +func (q *activequeue) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.activeBindings.Len() +} + +// Pop blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *activequeue) Pop() (bindingInfo *QueuedBindingInfo, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for q.activeBindings.Len() == 0 && !q.shuttingDown { + q.cond.Wait() + } + if q.activeBindings.Len() == 0 { + // We must be shutting down. + return nil, true + } + + bindingInfo, _ = q.activeBindings.Pop() + bindingInfo.Attempts++ + q.processingBindings.Insert(bindingInfo.NamespacedKey) + q.dirtyBindings.Delete(bindingInfo.NamespacedKey) + + return bindingInfo, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *activequeue) Done(bindingInfo *QueuedBindingInfo) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.processingBindings.Delete(bindingInfo.NamespacedKey) + if q.dirtyBindings.Has(bindingInfo.NamespacedKey) { + bindingInfo.Timestamp = time.Now() + q.activeBindings.AddOrUpdate(bindingInfo) + q.cond.Signal() + } else if q.processingBindings.Len() == 0 { + q.cond.Signal() + } +} + +// ShutDown will cause q to ignore all new items added to it and +// immediately instruct the worker goroutines to exit. +func (q *activequeue) ShutDown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.shuttingDown = true + q.cond.Broadcast() +} + +// Has inform if bindingInfo exists in the queue. +func (q *activequeue) Has(key string) bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.dirtyBindings.Has(key) +} diff --git a/pkg/scheduler/internal/queue/active_queue_test.go b/pkg/scheduler/internal/queue/active_queue_test.go new file mode 100644 index 000000000..e944674be --- /dev/null +++ b/pkg/scheduler/internal/queue/active_queue_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" +) + +func TestPriorityQueue(t *testing.T) { + tests := []struct { + name string + testFunc func() bool + }{ + { + name: "Add b1, b2 and Get the binding with highest priority", + testFunc: func() bool { + b1 := &QueuedBindingInfo{NamespacedKey: "foo1", Priority: 2} + b2 := &QueuedBindingInfo{NamespacedKey: "foo2", Priority: 3} + queue := NewActiveQueue(nil) + queue.Push(b1) + queue.Push(b2) + item, shutdown := queue.Pop() + if shutdown { + return false + } + return item.NamespacedKey == b2.NamespacedKey + }, + }, + { + name: "Add b1, b2 and Get the binding which was enqueued first", + testFunc: func() bool { + b1 := &QueuedBindingInfo{NamespacedKey: "foo1"} + b2 := &QueuedBindingInfo{NamespacedKey: "foo2"} + queue := NewActiveQueue(nil) + queue.Push(b1) + queue.Push(b2) + item, shutdown := queue.Pop() + if shutdown { + return false + } + return item.NamespacedKey == b1.NamespacedKey + }, + }, + } + for _, tt := range tests { + success := tt.testFunc() + if !success { + t.Fatalf(`%q should be success, but get failed`, tt.name) + } + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go new file mode 100755 index 000000000..943285f2f --- /dev/null +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -0,0 +1,385 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/scheduler/internal/heap" + metrics "github.com/karmada-io/karmada/pkg/scheduler/metrics/queue" +) + +const ( + // Scheduling queue names + activeQ = "Active" + backoffQ = "Backoff" + unschedulableBindings = "Unschedulable" +) + +const ( + // DefaultBindingMaxInUnschedulableBindingsDuration is the default value for the maximum + // time a binding can stay in unschedulableBindings. If a binding stays in unschedulableBindings + // for longer than this value, the binding will be moved from unschedulableBindings to + // backoffQ or activeQ. If this value is empty, the default value (5min) + // will be used. + DefaultBindingMaxInUnschedulableBindingsDuration = 5 * time.Minute + + // DefaultBindingInitialBackoffDuration is the default value for the initial backoff duration + // for unschedulable bindings. + DefaultBindingInitialBackoffDuration = 1 * time.Second + + // DefaultBindingMaxBackoffDuration is the default value for the max backoff duration + // for unschedulable bindings. + DefaultBindingMaxBackoffDuration = 10 * time.Second +) + +// SchedulingQueue is an interface for a queue to store bindings waiting to be scheduled. +// The interface follows a pattern similar to cache.FIFO and cache.Heap and +// makes it easy to use those data structures as a SchedulingQueue. +type SchedulingQueue interface { + // Push pushes an new binding to activeQ. + Push(bindingInfo *QueuedBindingInfo) + + // PushUnschedulableIfNotPresent pushes an unschedulable binding back to scheduling queue. + PushUnschedulableIfNotPresent(bindingInfo *QueuedBindingInfo) + + // PushBackoffIfNotPresent pushes an failed binding back to scheduling queue. + PushBackoffIfNotPresent(bindingInfo *QueuedBindingInfo) + + // Pop removes the head of the queue and returns it. It blocks if the + // queue is empty and waits until a new binding is added to the queue. + Pop() (*QueuedBindingInfo, bool) + + // Done must be called for binding returned by Pop. This allows the queue to + // keep track of which bindings are currently being processed. + Done(bindingInfo *QueuedBindingInfo) + + // Len returns the length of activeQ. + Len() int + + // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing + // or for success, we'll remove it from backoffQ, but you still have to call `Done` on the queue. + Forget(bindingInfo *QueuedBindingInfo) + + // Run starts the goroutines managing the queue. + Run() + + // Close closes the SchedulingQueue so that the goroutine which is + // waiting to pop items can exit gracefully. + Close() +} + +type schedulingQueueOptions struct { + bindingInitialBackoffDuration time.Duration + bindingMaxBackoffDuration time.Duration + bindingMaxInUnschedulableBindingsDuration time.Duration +} + +// Option configures a PriorityQueue +type Option func(*schedulingQueueOptions) + +// WithBindingInitialBackoffDuration sets binding initial backoff duration for SchedulingQueue. +func WithBindingInitialBackoffDuration(duration time.Duration) Option { + return func(o *schedulingQueueOptions) { + o.bindingInitialBackoffDuration = duration + } +} + +// WithBindingMaxBackoffDuration sets binding max backoff duration for SchedulingQueue. +func WithBindingMaxBackoffDuration(duration time.Duration) Option { + return func(o *schedulingQueueOptions) { + o.bindingMaxBackoffDuration = duration + } +} + +// WithBindingMaxInUnschedulableBindingsDuration sets bindingMaxInUnschedulableBindingsDuration for SchedulingQueue. +func WithBindingMaxInUnschedulableBindingsDuration(duration time.Duration) Option { + return func(o *schedulingQueueOptions) { + o.bindingMaxInUnschedulableBindingsDuration = duration + } +} + +var defaultSchedulingQueueOptions = schedulingQueueOptions{ + bindingInitialBackoffDuration: DefaultBindingInitialBackoffDuration, + bindingMaxBackoffDuration: DefaultBindingMaxBackoffDuration, + bindingMaxInUnschedulableBindingsDuration: DefaultBindingMaxInUnschedulableBindingsDuration, +} + +// NewSchedulingQueue builds a SchedulingQueue instance. +func NewSchedulingQueue(opts ...Option) SchedulingQueue { + options := defaultSchedulingQueueOptions + for _, opt := range opts { + opt(&options) + } + + bq := &prioritySchedulingQueue{ + stop: make(chan struct{}), + bindingInitialBackoffDuration: options.bindingInitialBackoffDuration, + bindingMaxBackoffDuration: options.bindingMaxBackoffDuration, + bindingMaxInUnschedulableBindingsDuration: options.bindingMaxInUnschedulableBindingsDuration, + activeQ: NewActiveQueue(metrics.NewActiveBindingsRecorder()), + backoffQ: heap.NewWithRecorder(BindingKeyFunc, Less, metrics.NewBackoffBindingsRecorder()), + unschedulableBindings: newUnschedulableBindings(metrics.NewUnschedulableBindingsRecorder()), + } + + return bq +} + +// prioritySchedulingQueue implements a SchedulingQueue. +// The head of PriorityQueue is the highest priority pending binding. This structure +// has two sub queues and a additional data structure, namely: activeQ, +// backoffQ and unschedulableBindings. +// - activeQ holds bindings that are being considered for scheduling. +// - backoffQ holds bindings that moved from unschedulableBindings and will move to +// activeQ when their backoff periods complete. +// - unschedulableBindings holds bindings that were already attempted for scheduling and +// are currently determined to be unschedulable. +type prioritySchedulingQueue struct { + // stopCh is used to stop the goroutine which periodically flushes pending bindings. + stop chan struct{} + // lock takes precedence and should be taken first, + // before any other locks in the queue. + lock sync.RWMutex + + // binding initial backoff duration. + bindingInitialBackoffDuration time.Duration + // binding maximum backoff duration. + bindingMaxBackoffDuration time.Duration + // the maximum time a binding can stay in the unschedulableBindings. + bindingMaxInUnschedulableBindingsDuration time.Duration + + // activeQ is a priority queue ordered by priority + activeQ ActiveQueue + // backoffQ is a heap ordered by backoff expiry. Bindings which have completed backoff + // are popped from this heap before the scheduler looks at activeQ. + backoffQ *heap.Heap[*QueuedBindingInfo] + // unschedulableBindings holds bindings that have been tried and determined unschedulable. + unschedulableBindings *UnschedulableBindings +} + +// Run starts the goroutine to flush backoffQ and unschedulableBindings. +func (bq *prioritySchedulingQueue) Run() { + go wait.Until(bq.flushBackoffQCompleted, 1.0*time.Second, bq.stop) + go wait.Until(bq.flushUnschedulableBindingsLeftover, 30*time.Second, bq.stop) +} + +// Close closes the scheduling queue. +func (bq *prioritySchedulingQueue) Close() { + bq.lock.Lock() + defer bq.lock.Unlock() + + close(bq.stop) + bq.activeQ.ShutDown() + bq.unschedulableBindings.clear() +} + +// flushBackoffQCompleted moves all bindings from backoffQ which have completed backoff in to activeQ +func (bq *prioritySchedulingQueue) flushBackoffQCompleted() { + bq.lock.Lock() + defer bq.lock.Unlock() + + for { + bInfo, ok := bq.backoffQ.Peek() + if !ok || bInfo == nil { + break + } + if bq.isBindingBackingoff(bInfo) { + break + } + _, err := bq.backoffQ.Pop() + if err != nil { + klog.Error(err, "Unable to pop binding from backoff queue despite backoff completion", "binding", bInfo.NamespacedKey) + break + } + bq.moveToActiveQ(bInfo) + } +} + +// isBindingBackingoff returns true if a binding is still waiting for its backoff timer. +// If this returns true, the binding should not be re-tried. +func (bq *prioritySchedulingQueue) isBindingBackingoff(bindingInfo *QueuedBindingInfo) bool { + boTime := bq.getBackoffTime(bindingInfo) + return boTime.After(time.Now()) +} + +// calculateBackoffDuration is a helper function for calculating the backoffDuration +// based on the number of attempts the binding has made. +func (bq *prioritySchedulingQueue) calculateBackoffDuration(bindingInfo *QueuedBindingInfo) time.Duration { + if bindingInfo.Attempts == 0 { + // When the Binding hasn't experienced any scheduling attempts, + // they aren't obliged to get a backoff penalty at all. + return 0 + } + + duration := bq.bindingInitialBackoffDuration + for i := 1; i < bindingInfo.Attempts; i++ { + // Use subtraction instead of addition or multiplication to avoid overflow. + if duration > bq.bindingMaxBackoffDuration-duration { + return bq.bindingMaxBackoffDuration + } + duration += duration + } + return duration +} + +// getBackoffTime returns the time that bindingInfo completes backoff +func (bq *prioritySchedulingQueue) getBackoffTime(bindingInfo *QueuedBindingInfo) time.Time { + duration := bq.calculateBackoffDuration(bindingInfo) + backoffTime := bindingInfo.Timestamp.Add(duration) + return backoffTime +} + +// flushUnschedulableBindingsLeftover moves bindings which stay in unschedulableBindings +// longer than bindingMaxInUnschedulableBindingsDuration to activeQ. +func (bq *prioritySchedulingQueue) flushUnschedulableBindingsLeftover() { + bq.lock.Lock() + defer bq.lock.Unlock() + + var bindingsToMove []*QueuedBindingInfo + currentTime := time.Now() + for _, bInfo := range bq.unschedulableBindings.bindingInfoMap { + lastScheduleTime := bInfo.Timestamp + if currentTime.Sub(lastScheduleTime) > bq.bindingMaxInUnschedulableBindingsDuration { + bindingsToMove = append(bindingsToMove, bInfo) + } + } + + for _, bInfo := range bindingsToMove { + bq.moveToActiveQ(bInfo) + } +} + +func (bq *prioritySchedulingQueue) Push(bindingInfo *QueuedBindingInfo) { + bq.lock.Lock() + defer bq.lock.Unlock() + + bq.moveToActiveQ(bindingInfo) +} + +// Pop removes the head of the active queue and returns it. It blocks if the +func (bq *prioritySchedulingQueue) Pop() (*QueuedBindingInfo, bool) { + // activeQ is empty and waits until a new item is added to the queue. + return bq.activeQ.Pop() +} + +func (bq *prioritySchedulingQueue) PushUnschedulableIfNotPresent(bindingInfo *QueuedBindingInfo) { + bq.lock.Lock() + defer bq.lock.Unlock() + + if bq.backoffQ.Has(bindingInfo) || bq.activeQ.Has(bindingInfo.NamespacedKey) { + return + } + + bq.unschedulableBindings.addOrUpdate(bindingInfo) + klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", unschedulableBindings) +} + +func (bq *prioritySchedulingQueue) PushBackoffIfNotPresent(bindingInfo *QueuedBindingInfo) { + bq.lock.Lock() + defer bq.lock.Unlock() + + if bq.unschedulableBindings.get(bindingInfo.NamespacedKey) != nil || bq.activeQ.Has(bindingInfo.NamespacedKey) { + return + } + + bq.backoffQ.AddOrUpdate(bindingInfo) + klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", backoffQ) +} + +// Done must be called for binding returned by Pop. This allows the queue to +// keep track of which bindings are currently being processed. +func (bq *prioritySchedulingQueue) Done(bindingInfo *QueuedBindingInfo) { + bq.activeQ.Done(bindingInfo) +} + +func (bq *prioritySchedulingQueue) Len() int { + return bq.activeQ.Len() +} + +func (bq *prioritySchedulingQueue) Forget(bindingInfo *QueuedBindingInfo) { + bq.lock.Lock() + defer bq.lock.Unlock() + + _ = bq.backoffQ.Delete(bindingInfo) +} + +// moveToActiveQ tries to add binding to active queue and remove it from unschedulable and backoff queues. +func (bq *prioritySchedulingQueue) moveToActiveQ(bindingInfo *QueuedBindingInfo) { + bq.activeQ.Push(bindingInfo) + _ = bq.backoffQ.Delete(bindingInfo) // just ignore this not-found error + bq.unschedulableBindings.delete(bindingInfo.NamespacedKey) + klog.V(4).Info("Binding moved to an internal scheduling queue", "binding", bindingInfo.NamespacedKey, "queue", activeQ) +} + +// UnschedulableBindings holds bindings that cannot be scheduled. This data structure +// is used to implement unschedulableBindings. +type UnschedulableBindings struct { + // bindingInfoMap is a map key by a binding's full-name and the value is a pointer to the QueuedBindingInfo. + bindingInfoMap map[string]*QueuedBindingInfo + // unschedulableRecorder updates the counter when elements of an bindingInfoMap + // get added or removed, and it does nothing if it's nil. + unschedulableRecorder metrics.MetricRecorder +} + +// addOrUpdate adds a binding to the unschedulable bindingInfoMap. +func (u *UnschedulableBindings) addOrUpdate(bindingInfo *QueuedBindingInfo) { + if _, exists := u.bindingInfoMap[bindingInfo.NamespacedKey]; !exists { + if u.unschedulableRecorder != nil { + u.unschedulableRecorder.Inc() + } + } + u.bindingInfoMap[bindingInfo.NamespacedKey] = bindingInfo +} + +// delete deletes a binding from the unschedulable bindingInfoMap. +func (u *UnschedulableBindings) delete(bindingKey string) { + if _, exists := u.bindingInfoMap[bindingKey]; exists { + if u.unschedulableRecorder != nil { + u.unschedulableRecorder.Dec() + } + } + delete(u.bindingInfoMap, bindingKey) +} + +// get returns the QueuedBindingInfo if a binding with the same key as the key of the given "binding" +// is found in the map. It returns nil otherwise. +func (u *UnschedulableBindings) get(bindingKey string) *QueuedBindingInfo { + if bindingInfo, exists := u.bindingInfoMap[bindingKey]; exists { + return bindingInfo + } + return nil +} + +// clear removes all the entries from the unschedulable bindingInfoMap. +func (u *UnschedulableBindings) clear() { + u.bindingInfoMap = make(map[string]*QueuedBindingInfo) + if u.unschedulableRecorder != nil { + u.unschedulableRecorder.Clear() + } +} + +// newUnschedulableBindings initializes a new object of UnschedulableBindings. +func newUnschedulableBindings(unschedulableRecorder metrics.MetricRecorder) *UnschedulableBindings { + return &UnschedulableBindings{ + bindingInfoMap: make(map[string]*QueuedBindingInfo), + unschedulableRecorder: unschedulableRecorder, + } +} diff --git a/pkg/scheduler/internal/queue/types.go b/pkg/scheduler/internal/queue/types.go new file mode 100755 index 000000000..337410fa1 --- /dev/null +++ b/pkg/scheduler/internal/queue/types.go @@ -0,0 +1,68 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "time" +) + +// QueuedBindingInfo is a Binding wrapper with additional information related to +// the binding's status in the scheduling queue, such as the timestamp when +// it's added to the queue. +type QueuedBindingInfo struct { + NamespacedKey string + + // The priority of ResourceBinding. + Priority int32 + + // The time binding added to the scheduling queue. + Timestamp time.Time + + // The time when the binding is added to the queue for the first time. The binding may be added + // back to the queue multiple times before it's successfully scheduled. + // It shouldn't be updated once initialized. + InitialAttemptTimestamp *time.Time + + // Number of schedule attempts before successfully scheduled. + // It's used to record the attempts metric and calculate the backoff time this Binding is obliged to get before retrying. + Attempts int +} + +// DeepCopy returns a deep copy of the QueuedBindingInfo object. +func (qbi *QueuedBindingInfo) DeepCopy() *QueuedBindingInfo { + return &QueuedBindingInfo{ + NamespacedKey: qbi.NamespacedKey, + Priority: qbi.Priority, + Timestamp: qbi.Timestamp, + Attempts: qbi.Attempts, + InitialAttemptTimestamp: qbi.InitialAttemptTimestamp, + } +} + +// BindingKeyFunc is the key mapping function of QueuedBindingInfo. +func BindingKeyFunc(bindingInfo *QueuedBindingInfo) string { + return bindingInfo.NamespacedKey +} + +// Less is the function used by the activeQ heap algorithm to sort bindings. +// It sorts bindings based on their priority. When priorities are equal, it uses +// QueuedBindingInfo.timestamp. +func Less(bInfo1, bInfo2 *QueuedBindingInfo) bool { + p1 := bInfo1.Priority + p2 := bInfo2.Priority + return (p1 > p2) || (p1 == p2 && bInfo1.Timestamp.Before(bInfo2.Timestamp)) +} diff --git a/pkg/scheduler/metrics/queue/metric_test.go b/pkg/scheduler/metrics/queue/metric_test.go new file mode 100644 index 000000000..3f25e8f4b --- /dev/null +++ b/pkg/scheduler/metrics/queue/metric_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "sync/atomic" + "testing" +) + +var _ MetricRecorder = &fakeBindingsRecorder{} + +type fakeBindingsRecorder struct { + counter int64 +} + +func (r *fakeBindingsRecorder) Inc() { + atomic.AddInt64(&r.counter, 1) +} + +func (r *fakeBindingsRecorder) Dec() { + atomic.AddInt64(&r.counter, -1) +} + +func (r *fakeBindingsRecorder) Clear() { + atomic.StoreInt64(&r.counter, 0) +} + +func TestInc(t *testing.T) { + fakeRecorder := fakeBindingsRecorder{} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(loops) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestDec(t *testing.T) { + fakeRecorder := fakeBindingsRecorder{counter: 100} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestClear(t *testing.T) { + fakeRecorder := fakeBindingsRecorder{} + var wg sync.WaitGroup + incLoops, decLoops := 100, 80 + wg.Add(incLoops + decLoops) + for i := 0; i < incLoops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + for i := 0; i < decLoops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(incLoops-decLoops) { + t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter) + } + // verify Clear() works + fakeRecorder.Clear() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter) + } +} diff --git a/pkg/scheduler/metrics/queue/metrics.go b/pkg/scheduler/metrics/queue/metrics.go new file mode 100755 index 000000000..69da63268 --- /dev/null +++ b/pkg/scheduler/metrics/queue/metrics.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + // SchedulerSubsystem - subsystem name used by scheduler. + SchedulerSubsystem = "scheduler" +) + +// All the histogram based metrics have 1ms as size for the smallest bucket. +var ( + pendingBindings = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: SchedulerSubsystem, + Name: "pending_bindings", + Help: "Number of pending bindings, by the queue type. 'active' means number of bindings in activeQ; 'backoff' means number of bindings in backoffQ; 'unschedulable' means number of bindings in unschedulableBindings that the scheduler attempted to schedule and failed.", + StabilityLevel: metrics.ALPHA, + }, []string{"queue"}) + + metricsList = []metrics.Registerable{ + pendingBindings, + } +) + +func init() { + // Register the metrics. + RegisterMetrics(metricsList...) +} + +// RegisterMetrics registers a list of metrics. +// This function is exported because it is intended to be used by out-of-tree plugins to register their custom metrics. +func RegisterMetrics(extraMetrics ...metrics.Registerable) { + for _, metric := range extraMetrics { + legacyregistry.MustRegister(metric) + } +} + +// ActiveBindings returns the pending bindings metrics with the label active +func ActiveBindings() metrics.GaugeMetric { + return pendingBindings.With(metrics.Labels{"queue": "active"}) +} + +// BackoffBindings returns the pending bindings metrics with the label backoff +func BackoffBindings() metrics.GaugeMetric { + return pendingBindings.With(metrics.Labels{"queue": "backoff"}) +} + +// UnschedulableBindings returns the pending bindings metrics with the label unschedulable +func UnschedulableBindings() metrics.GaugeMetric { + return pendingBindings.With(metrics.Labels{"queue": "unschedulable"}) +} + +// MetricRecorder represents a metric recorder which takes action when the +// metric Inc(), Dec() and Clear() +type MetricRecorder interface { + Inc() + Dec() + Clear() +} + +var _ MetricRecorder = &PendingBindingsRecorder{} + +// PendingBindingsRecorder is an implementation of MetricRecorder +type PendingBindingsRecorder struct { + recorder metrics.GaugeMetric +} + +// NewActiveBindingsRecorder returns ActiveBindings in a Prometheus metric fashion +func NewActiveBindingsRecorder() *PendingBindingsRecorder { + return &PendingBindingsRecorder{ + recorder: ActiveBindings(), + } +} + +// NewUnschedulableBindingsRecorder returns UnschedulableBindings in a Prometheus metric fashion +func NewUnschedulableBindingsRecorder() *PendingBindingsRecorder { + return &PendingBindingsRecorder{ + recorder: UnschedulableBindings(), + } +} + +// NewBackoffBindingsRecorder returns BackoffBindings in a Prometheus metric fashion +func NewBackoffBindingsRecorder() *PendingBindingsRecorder { + return &PendingBindingsRecorder{ + recorder: BackoffBindings(), + } +} + +// Inc increases a metric counter by 1, in an atomic way +func (r *PendingBindingsRecorder) Inc() { + r.recorder.Inc() +} + +// Dec decreases a metric counter by 1, in an atomic way +func (r *PendingBindingsRecorder) Dec() { + r.recorder.Dec() +} + +// Clear set a metric counter to 0, in an atomic way +func (r *PendingBindingsRecorder) Clear() { + r.recorder.Set(float64(0)) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index daaf7135b..36d1a2186 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -44,6 +44,7 @@ import ( workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" estimatorclient "github.com/karmada-io/karmada/pkg/estimator/client" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/features" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" @@ -53,6 +54,7 @@ import ( "github.com/karmada-io/karmada/pkg/scheduler/framework" frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins" "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" + internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" @@ -95,9 +97,12 @@ type Scheduler struct { // clusterReconcileWorker reconciles cluster changes to trigger corresponding // ResourceBinding/ClusterResourceBinding rescheduling. clusterReconcileWorker util.AsyncWorker - // TODO: implement a priority scheduling queue + + // queue is the legacy rate limiting queue which will be replaced by priorityQueue + // in the future releases. queue workqueue.TypedRateLimitingInterface[any] + priorityQueue internalqueue.SchedulingQueue Algorithm core.ScheduleAlgorithm schedulerCache schedulercache.Cache @@ -128,7 +133,7 @@ type schedulerOptions struct { schedulerEstimatorServicePrefix string // schedulerName is the name of the scheduler. Default is "default-scheduler". schedulerName string - //enableEmptyWorkloadPropagation represents whether allow workload with replicas 0 propagated to member clusters should be enabled + // enableEmptyWorkloadPropagation represents whether allow workload with replicas 0 propagated to member clusters should be enabled enableEmptyWorkloadPropagation bool // outOfTreeRegistry represents the registry of out-of-tree plugins outOfTreeRegistry runtime.Registry @@ -239,7 +244,13 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse for _, opt := range opts { opt(&options) } - queue := workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"}) + var legacyQueue workqueue.TypedRateLimitingInterface[any] + var priorityQueue internalqueue.SchedulingQueue + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + priorityQueue = internalqueue.NewSchedulingQueue() + } else { + legacyQueue = workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"}) + } registry := frameworkplugins.NewInTreeRegistry() if err := registry.Merge(options.outOfTreeRegistry); err != nil { return nil, err @@ -258,7 +269,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse clusterBindingLister: clusterBindingLister, clusterLister: clusterLister, informerFactory: factory, - queue: queue, + queue: legacyQueue, + priorityQueue: priorityQueue, Algorithm: algorithm, schedulerCache: schedulerCache, } @@ -310,8 +322,14 @@ func (s *Scheduler) Run(ctx context.Context) { go wait.Until(s.worker, time.Second, stopCh) - <-stopCh - s.queue.ShutDown() + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + s.priorityQueue.Run() + <-stopCh + s.priorityQueue.Close() + } else { + <-stopCh + s.queue.ShutDown() + } } func (s *Scheduler) worker() { @@ -320,15 +338,27 @@ func (s *Scheduler) worker() { } func (s *Scheduler) scheduleNext() bool { - key, shutdown := s.queue.Get() - if shutdown { - klog.Errorf("Fail to pop item from queue") - return false - } - defer s.queue.Done(key) + if features.FeatureGate.Enabled(features.PriorityBasedScheduling) { + bindingInfo, shutdown := s.priorityQueue.Pop() + if shutdown { + klog.Errorf("Fail to pop item from priorityQueue") + return false + } + defer s.priorityQueue.Done(bindingInfo) - err := s.doSchedule(key.(string)) - s.handleErr(err, key) + err := s.doSchedule(bindingInfo.NamespacedKey) + s.handleErr(err, bindingInfo) + } else { + key, shutdown := s.queue.Get() + if shutdown { + klog.Errorf("Fail to pop item from queue") + return false + } + defer s.queue.Done(key) + + err := s.doSchedule(key.(string)) + s.legacyHandleErr(err, key) + } return true } @@ -759,12 +789,26 @@ func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *wor return nil } -func (s *Scheduler) handleErr(err error, key interface{}) { +func (s *Scheduler) handleErr(err error, bindingInfo *internalqueue.QueuedBindingInfo) { + if err == nil || apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { + s.priorityQueue.Forget(bindingInfo) + return + } + + var unschedulableErr *framework.UnschedulableError + if !errors.As(err, &unschedulableErr) { + s.priorityQueue.PushUnschedulableIfNotPresent(bindingInfo) + } else { + s.priorityQueue.PushBackoffIfNotPresent(bindingInfo) + } + metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) +} + +func (s *Scheduler) legacyHandleErr(err error, key interface{}) { if err == nil || apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { s.queue.Forget(key) return } - s.queue.AddRateLimited(key) metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure) } diff --git a/pkg/scheduler/scheduler_metrics_test.go b/pkg/scheduler/scheduler_metrics_test.go index c3e14488c..7431920cb 100644 --- a/pkg/scheduler/scheduler_metrics_test.go +++ b/pkg/scheduler/scheduler_metrics_test.go @@ -25,10 +25,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics/testutil" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/features" karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" + internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue" "github.com/karmada-io/karmada/pkg/scheduler/metrics" ) @@ -69,10 +72,18 @@ var ( scheduler.onClusterResourceBindingRequeue(crb, metrics.ClusterChanged) } scheduleAttemptSuccess = func(scheduler *Scheduler, obj interface{}) { - scheduler.handleErr(nil, obj) + rb := obj.(*workv1alpha2.ResourceBinding) + scheduler.handleErr(nil, &internalqueue.QueuedBindingInfo{ + NamespacedKey: cache.ObjectName{Namespace: rb.Namespace, Name: rb.Namespace}.String(), + Priority: rb.Spec.SchedulePriorityValue(), + }) } scheduleAttemptFailure = func(scheduler *Scheduler, obj interface{}) { - scheduler.handleErr(fmt.Errorf("schedule attempt failure"), obj) + rb := obj.(*workv1alpha2.ResourceBinding) + scheduler.handleErr(fmt.Errorf("schedule attempt failure"), &internalqueue.QueuedBindingInfo{ + NamespacedKey: cache.ObjectName{Namespace: rb.Namespace, Name: rb.Namespace}.String(), + Priority: rb.Spec.SchedulePriorityValue(), + }) } ) @@ -81,6 +92,8 @@ func TestIncomingBindingMetrics(t *testing.T) { karmadaClient := karmadafake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset() + // enable "PriorityBasedScheduling" feature gate. + _ = features.FeatureGate.Set("PriorityBasedScheduling=true") sche, err := NewScheduler(dynamicClient, karmadaClient, kubeClient) if err != nil { t.Errorf("create scheduler error: %s", err) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 59ca28f59..4f4af6bba 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -35,14 +35,15 @@ import ( "k8s.io/client-go/kubernetes/scheme" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/features" karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" workv1alpha2lister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2" "github.com/karmada-io/karmada/pkg/scheduler/core" schedulercore "github.com/karmada-io/karmada/pkg/scheduler/core" + internalqueue "github.com/karmada-io/karmada/pkg/scheduler/internal/queue" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/grpcconnection" @@ -1065,26 +1066,31 @@ func TestWorkerAndScheduleNext(t *testing.T) { testCases := []struct { name string key string + priority int32 shutdown bool expectResult bool }{ { name: "Schedule ResourceBinding", key: "default/test-binding", + priority: 10, shutdown: false, expectResult: true, }, { name: "Schedule ClusterResourceBinding", key: "test-cluster-binding", + priority: 5, shutdown: false, expectResult: true, }, } + // enable "PriorityBasedScheduling" feature gate. + _ = features.FeatureGate.Set("PriorityBasedScheduling=true") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]()) + queue := internalqueue.NewSchedulingQueue() bindingLister := &fakeBindingLister{binding: resourceBinding} clusterBindingLister := &fakeClusterBindingLister{binding: clusterResourceBinding} @@ -1103,17 +1109,20 @@ func TestWorkerAndScheduleNext(t *testing.T) { s := &Scheduler{ KarmadaClient: fakeClient, - queue: queue, + priorityQueue: queue, bindingLister: bindingLister, clusterBindingLister: clusterBindingLister, Algorithm: mockAlgo, eventRecorder: eventRecorder, } - s.queue.Add(tc.key) + s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{ + NamespacedKey: tc.key, + Priority: tc.priority, + }) if tc.shutdown { - s.queue.ShutDown() + s.priorityQueue.Close() } result := s.scheduleNext() @@ -1121,7 +1130,7 @@ func TestWorkerAndScheduleNext(t *testing.T) { assert.Equal(t, tc.expectResult, result, "scheduleNext return value mismatch") if !tc.shutdown { - assert.Equal(t, 0, s.queue.Len(), "Queue should be empty after processing") + assert.Equal(t, 0, s.priorityQueue.Len(), "Queue should be empty after processing") } }) } @@ -1224,6 +1233,7 @@ func TestCreateScheduler(t *testing.T) { enableEmptyWorkloadPropagation bool plugins []string rateLimiterOptions ratelimiterflag.Options + enablePriorityBasedScheduling bool }{ { name: "scheduler with default configuration", @@ -1313,11 +1323,11 @@ func TestCreateScheduler(t *testing.T) { plugins: mockPlugins, }, { - name: "scheduler with RateLimiterOptions", + name: "scheduler with PriorityBasedScheduling enabled", opts: []Option{ WithRateLimiterOptions(mockRateLimiterOptions), }, - rateLimiterOptions: mockRateLimiterOptions, + enablePriorityBasedScheduling: true, }, } @@ -1358,8 +1368,8 @@ func TestCreateScheduler(t *testing.T) { if len(tc.plugins) > 0 && sche.Algorithm == nil { t.Errorf("expected Algorithm to be set when plugins are provided") } - if tc.rateLimiterOptions != (ratelimiterflag.Options{}) && sche.queue == nil { - t.Errorf("expected queue to be set when rate limiter options are provided") + if tc.enablePriorityBasedScheduling && sche.priorityQueue == nil { + t.Errorf("expected priorityQueue to be set when feature gate %q is enabled", features.PriorityBasedScheduling) } }) }