Handle V1Alpha2 version for Queue in Code

This commit is contained in:
Thandayuthapani 2019-07-09 18:23:06 +05:30
parent 247a06f759
commit 0d42f29bbd
8 changed files with 256 additions and 30 deletions

View File

@ -1,4 +1,5 @@
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
volcano.sh/volcano/pkg/apis/scheduling/v1alpha2
volcano.sh/volcano/pkg/apis/utils
volcano.sh/volcano/pkg/scheduler/actions/allocate
volcano.sh/volcano/pkg/scheduler/actions/backfill

View File

@ -173,7 +173,7 @@ func TestAllocate(t *testing.T) {
}
for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}
trueValue := true

View File

@ -165,7 +165,7 @@ func TestPreempt(t *testing.T) {
}
for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}
trueValue := true

View File

@ -134,7 +134,7 @@ func TestReclaim(t *testing.T) {
}
for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}
trueValue := true

View File

@ -20,10 +20,10 @@ import (
"encoding/json"
"github.com/golang/glog"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)
//PodGroupConditionType is of string type which represents podGroup Condition

View File

@ -17,11 +17,56 @@ limitations under the License.
package api
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
arbcorev1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"
)
const (
//QueueVersionV1Alpha1 represents PodGroupVersion of V1Alpha1
QueueVersionV1Alpha1 string = "v1alpha1"
//QueueVersionV1Alpha2 represents PodGroupVersion of V1Alpha2
QueueVersionV1Alpha2 string = "v1alpha2"
)
// Queue is a queue of PodGroup.
type Queue struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the queue.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
// +optional
Spec QueueSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// The status of queue.
// +optional
Status QueueStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
//Version is used to retrieve information about queue version
Version string
}
// QueueStatus represents the status of Queue.
type QueueStatus struct {
// The number of 'Unknonw' PodGroup in this queue.
Unknown int32 `json:"unknown,omitempty" protobuf:"bytes,1,opt,name=unknown"`
// The number of 'Pending' PodGroup in this queue.
Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"`
// The number of 'Running' PodGroup in this queue.
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
}
// QueueSpec represents the template of Queue.
type QueueSpec struct {
Weight int32 `json:"weight,omitempty" protobuf:"bytes,1,opt,name=weight"`
Capability v1.ResourceList `json:"capability,omitempty" protobuf:"bytes,2,opt,name=capability"`
}
// QueueID is UID type, serves as unique ID for each queue
type QueueID types.UID
@ -32,11 +77,11 @@ type QueueInfo struct {
Weight int32
Queue *arbcorev1.Queue
Queue *Queue
}
// NewQueueInfo creates new queueInfo object
func NewQueueInfo(queue *arbcorev1.Queue) *QueueInfo {
func NewQueueInfo(queue *Queue) *QueueInfo {
return &QueueInfo{
UID: QueueID(queue.Name),
Name: queue.Name,

View File

@ -85,7 +85,8 @@ type SchedulerCache struct {
nsInformer infov1.NamespaceInformer
podGroupInformerv1alpha1 kbinfov1.PodGroupInformer
podGroupInformerv1alpha2 kbinfov2.PodGroupInformer
queueInformer kbinfov1.QueueInformer
queueInformerv1alpha1 kbinfov1.QueueInformer
queueInformerv1alpha2 kbinfov2.QueueInformer
pvInformer infov1.PersistentVolumeInformer
pvcInformer infov1.PersistentVolumeClaimInformer
scInformer storagev1.StorageClassInformer
@ -370,12 +371,20 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeleteFunc: sc.DeletePodGroupAlpha2,
})
// create informer for Queue information
sc.queueInformer = kbinformer.Scheduling().V1alpha1().Queues()
sc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueue,
UpdateFunc: sc.UpdateQueue,
DeleteFunc: sc.DeleteQueue,
// create informer(v1alpha1) for Queue information
sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues()
sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueuev1alpha1,
UpdateFunc: sc.UpdateQueuev1alpha1,
DeleteFunc: sc.DeleteQueuev1alpha1,
})
// create informer(v1alpha2) for Queue information
sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues()
sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueuev1alpha2,
UpdateFunc: sc.UpdateQueuev1alpha2,
DeleteFunc: sc.DeleteQueuev1alpha2,
})
return sc
@ -391,7 +400,8 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pvInformer.Informer().Run(stopCh)
go sc.pvcInformer.Informer().Run(stopCh)
go sc.scInformer.Informer().Run(stopCh)
go sc.queueInformer.Informer().Run(stopCh)
go sc.queueInformerv1alpha1.Informer().Run(stopCh)
go sc.queueInformerv1alpha2.Informer().Run(stopCh)
if options.ServerOpts.EnablePriorityClass {
go sc.pcInformer.Informer().Run(stopCh)
@ -418,7 +428,8 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
sc.pvInformer.Informer().HasSynced,
sc.pvcInformer.Informer().HasSynced,
sc.scInformer.Informer().HasSynced,
sc.queueInformer.Informer().HasSynced,
sc.queueInformerv1alpha1.Informer().HasSynced,
sc.queueInformerv1alpha2.Informer().HasSynced,
}
if options.ServerOpts.EnablePriorityClass {
informerSynced = append(informerSynced, sc.pcInformer.Informer().HasSynced)

View File

@ -769,19 +769,31 @@ func (sc *SchedulerCache) DeletePDB(obj interface{}) {
return
}
//AddQueue add queue to scheduler cache
func (sc *SchedulerCache) AddQueue(obj interface{}) {
//AddQueuev1alpha1 add queue to scheduler cache
func (sc *SchedulerCache) AddQueuev1alpha1(obj interface{}) {
ss, ok := obj.(*kbv1.Queue)
if !ok {
glog.Errorf("Cannot convert to *kbv1.Queue: %v", obj)
return
}
marshalled, err := json.Marshal(*ss)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", ss.Name, err)
}
queue := &kbapi.Queue{}
err = json.Unmarshal(marshalled, queue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
queue.Version = kbapi.QueueVersionV1Alpha1
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("Add Queue(%s) into cache, spec(%#v)", ss.Name, ss.Spec)
err := sc.addQueue(ss)
err = sc.addQueue(queue)
if err != nil {
glog.Errorf("Failed to add Queue %s into cache: %v", ss.Name, err)
return
@ -789,8 +801,40 @@ func (sc *SchedulerCache) AddQueue(obj interface{}) {
return
}
//UpdateQueue update queue to scheduler cache
func (sc *SchedulerCache) UpdateQueue(oldObj, newObj interface{}) {
//AddQueuev1alpha2 add queue to scheduler cache
func (sc *SchedulerCache) AddQueuev1alpha2(obj interface{}) {
ss, ok := obj.(*kbv2.Queue)
if !ok {
glog.Errorf("Cannot convert to *kbv2.Queue: %v", obj)
return
}
marshalled, err := json.Marshal(*ss)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", ss.Name, err)
}
queue := &kbapi.Queue{}
err = json.Unmarshal(marshalled, queue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
queue.Version = kbapi.QueueVersionV1Alpha2
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("Add Queue(%s) into cache, spec(%#v)", ss.Name, ss.Spec)
err = sc.addQueue(queue)
if err != nil {
glog.Errorf("Failed to add Queue %s into cache: %v", ss.Name, err)
return
}
return
}
//UpdateQueuev1alpha1 update queue to scheduler cache
func (sc *SchedulerCache) UpdateQueuev1alpha1(oldObj, newObj interface{}) {
oldSS, ok := oldObj.(*kbv1.Queue)
if !ok {
glog.Errorf("Cannot convert oldObj to *kbv1.Queue: %v", oldObj)
@ -802,10 +846,34 @@ func (sc *SchedulerCache) UpdateQueue(oldObj, newObj interface{}) {
return
}
oldMarshalled, err := json.Marshal(*oldSS)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", oldSS.Name, err)
}
oldQueue := &kbapi.Queue{}
err = json.Unmarshal(oldMarshalled, oldQueue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
oldQueue.Version = kbapi.QueueVersionV1Alpha1
newMarshalled, err := json.Marshal(*newSS)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", newSS.Name, err)
}
newQueue := &kbapi.Queue{}
err = json.Unmarshal(newMarshalled, newQueue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
newQueue.Version = kbapi.QueueVersionV1Alpha1
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.updateQueue(oldSS, newSS)
err = sc.updateQueue(oldQueue, newQueue)
if err != nil {
glog.Errorf("Failed to update Queue %s into cache: %v", oldSS.Name, err)
return
@ -813,8 +881,56 @@ func (sc *SchedulerCache) UpdateQueue(oldObj, newObj interface{}) {
return
}
//DeleteQueue delete queue from the scheduler cache
func (sc *SchedulerCache) DeleteQueue(obj interface{}) {
//UpdateQueuev1alpha2 update queue to scheduler cache
func (sc *SchedulerCache) UpdateQueuev1alpha2(oldObj, newObj interface{}) {
oldSS, ok := oldObj.(*kbv2.Queue)
if !ok {
glog.Errorf("Cannot convert oldObj to *kbv2.Queue: %v", oldObj)
return
}
newSS, ok := newObj.(*kbv2.Queue)
if !ok {
glog.Errorf("Cannot convert newObj to *kbv2.Queue: %v", newObj)
return
}
oldMarshalled, err := json.Marshal(*oldSS)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", oldSS.Name, err)
}
oldQueue := &kbapi.Queue{}
err = json.Unmarshal(oldMarshalled, oldQueue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
oldQueue.Version = kbapi.QueueVersionV1Alpha2
newMarshalled, err := json.Marshal(*newSS)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", newSS.Name, err)
}
newQueue := &kbapi.Queue{}
err = json.Unmarshal(newMarshalled, newQueue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
newQueue.Version = kbapi.QueueVersionV1Alpha2
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err = sc.updateQueue(oldQueue, newQueue)
if err != nil {
glog.Errorf("Failed to update Queue %s into cache: %v", oldSS.Name, err)
return
}
return
}
//DeleteQueuev1alpha1 delete queue from the scheduler cache
func (sc *SchedulerCache) DeleteQueuev1alpha1(obj interface{}) {
var ss *kbv1.Queue
switch t := obj.(type) {
case *kbv1.Queue:
@ -831,10 +947,22 @@ func (sc *SchedulerCache) DeleteQueue(obj interface{}) {
return
}
marshalled, err := json.Marshal(*ss)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", ss.Name, err)
}
queue := &kbapi.Queue{}
err = json.Unmarshal(marshalled, queue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
queue.Version = kbapi.QueueVersionV1Alpha1
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.deleteQueue(ss)
err = sc.deleteQueue(queue)
if err != nil {
glog.Errorf("Failed to delete Queue %s from cache: %v", ss.Name, err)
return
@ -842,21 +970,62 @@ func (sc *SchedulerCache) DeleteQueue(obj interface{}) {
return
}
func (sc *SchedulerCache) addQueue(queue *kbv1.Queue) error {
//DeleteQueuev1alpha2 delete queue from the scheduler cache
func (sc *SchedulerCache) DeleteQueuev1alpha2(obj interface{}) {
var ss *kbv2.Queue
switch t := obj.(type) {
case *kbv2.Queue:
ss = t
case cache.DeletedFinalStateUnknown:
var ok bool
ss, ok = t.Obj.(*kbv2.Queue)
if !ok {
glog.Errorf("Cannot convert to *kbv2.Queue: %v", t.Obj)
return
}
default:
glog.Errorf("Cannot convert to *kbv1.Queue: %v", t)
return
}
marshalled, err := json.Marshal(*ss)
if err != nil {
glog.Errorf("Failed to Marshal Queue %s with error: %v", ss.Name, err)
}
queue := &kbapi.Queue{}
err = json.Unmarshal(marshalled, queue)
if err != nil {
glog.Errorf("Failed to Unmarshal Data into api.Queue type with error: %v", err)
}
queue.Version = kbapi.QueueVersionV1Alpha2
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err = sc.deleteQueue(queue)
if err != nil {
glog.Errorf("Failed to delete Queue %s from cache: %v", ss.Name, err)
return
}
return
}
func (sc *SchedulerCache) addQueue(queue *kbapi.Queue) error {
qi := kbapi.NewQueueInfo(queue)
sc.Queues[qi.UID] = qi
return nil
}
func (sc *SchedulerCache) updateQueue(oldObj, newObj *kbv1.Queue) error {
func (sc *SchedulerCache) updateQueue(oldObj, newObj *kbapi.Queue) error {
sc.deleteQueue(oldObj)
sc.addQueue(newObj)
return nil
}
func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error {
func (sc *SchedulerCache) deleteQueue(queue *kbapi.Queue) error {
qi := kbapi.NewQueueInfo(queue)
delete(sc.Queues, qi.UID)