diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 39797d96e..590f0fe81 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -19,7 +19,6 @@ package allocate import ( "github.com/golang/glog" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/util" @@ -47,7 +46,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { - if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if job.PodGroup.Status.Phase == api.PodGroupPending { continue } if vr := ssn.JobValid(job); vr != nil && !vr.Pass { diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index b97053ac4..76f5983f0 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -169,7 +169,7 @@ func TestAllocate(t *testing.T) { } for _, ss := range test.podGroups { - schedulerCache.AddPodGroup(ss) + schedulerCache.AddPodGroupAlpha1(ss) } for _, q := range test.queues { diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index c02a15089..d84a150cf 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -19,7 +19,6 @@ package backfill import ( "github.com/golang/glog" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" ) @@ -44,7 +43,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { - if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if job.PodGroup.Status.Phase == api.PodGroupPending { continue } if vr := ssn.JobValid(job); vr != nil && !vr.Pass { diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index ba93f889c..40bb852ae 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -19,7 +19,6 @@ package enqueue import ( "github.com/golang/glog" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/util" @@ -63,7 +62,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { } } - if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if job.PodGroup.Status.Phase == api.PodGroupPending { if _, found := jobsMap[job.Queue]; !found { jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } @@ -112,7 +111,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { } if inqueue { - job.PodGroup.Status.Phase = v1alpha1.PodGroupInqueue + job.PodGroup.Status.Phase = api.PodGroupInqueue ssn.Jobs[job.UID] = job } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 80e348e85..6d3f2d594 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -21,7 +21,6 @@ import ( "github.com/golang/glog" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -53,7 +52,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { queues := map[api.QueueID]*api.QueueInfo{} for _, job := range ssn.Jobs { - if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if job.PodGroup.Status.Phase == api.PodGroupPending { continue } if vr := ssn.JobValid(job); vr != nil && !vr.Pass { diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index ae89ca836..dfa1381cb 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -161,7 +161,7 @@ func TestPreempt(t *testing.T) { } for _, ss := range test.podGroups { - schedulerCache.AddPodGroup(ss) + schedulerCache.AddPodGroupAlpha1(ss) } for _, q := range test.queues { diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 7ec7eef83..92a500d7e 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -19,7 +19,6 @@ package reclaim import ( "github.com/golang/glog" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/util" @@ -54,7 +53,7 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { var underRequest []*api.JobInfo for _, job := range ssn.Jobs { - if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending { + if job.PodGroup.Status.Phase == api.PodGroupPending { continue } if vr := ssn.JobValid(job); vr != nil && !vr.Pass { diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index b85c2d48f..242344233 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -130,7 +130,7 @@ func TestReclaim(t *testing.T) { } for _, ss := range test.podGroups { - schedulerCache.AddPodGroup(ss) + schedulerCache.AddPodGroupAlpha1(ss) } for _, q := range test.queues { diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 861ecd0d5..80b9d9a58 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -150,7 +150,7 @@ type JobInfo struct { TotalRequest *Resource CreationTimestamp metav1.Time - PodGroup *v1alpha1.PodGroup + PodGroup *PodGroup // TODO(k82cn): keep backward compatibility, removed it when v1alpha1 finalized. PDB *policyv1.PodDisruptionBudget @@ -186,7 +186,7 @@ func (ji *JobInfo) UnsetPodGroup() { } // SetPodGroup sets podGroup details to a job -func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) { +func (ji *JobInfo) SetPodGroup(pg *PodGroup) { ji.Name = pg.Name ji.Namespace = pg.Namespace ji.MinAvailable = pg.Spec.MinMember @@ -309,7 +309,7 @@ func (ji *JobInfo) Clone() *JobInfo { NodesFitErrors: make(map[TaskID]*FitErrors), PDB: ji.PDB, - PodGroup: ji.PodGroup.DeepCopy(), + PodGroup: ji.PodGroup, TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, diff --git a/pkg/scheduler/api/pod_group_info.go b/pkg/scheduler/api/pod_group_info.go new file mode 100644 index 000000000..a3dd96eee --- /dev/null +++ b/pkg/scheduler/api/pod_group_info.go @@ -0,0 +1,221 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "encoding/json" + + "github.com/golang/glog" + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +//PodGroupConditionType is of string type which represents podGroup Condition +type PodGroupConditionType string + +const ( + //PodGroupUnschedulableType represents unschedulable podGroup condition + PodGroupUnschedulableType PodGroupConditionType = "Unschedulable" +) + +// PodGroupPhase is the phase of a pod group at the current time. +type PodGroupPhase string + +// These are the valid phase of podGroups. +const ( + //PodGroupVersionV1Alpha1 represents PodGroupVersion of V1Alpha1 + PodGroupVersionV1Alpha1 string = "v1alpha1" + + //PodGroupVersionV1Alpha2 represents PodGroupVersion of V1Alpha2 + PodGroupVersionV1Alpha2 string = "v1alpha2" + // PodPending means the pod group has been accepted by the system, but scheduler can not allocate + // enough resources to it. + PodGroupPending PodGroupPhase = "Pending" + + // PodRunning means `spec.minMember` pods of PodGroups has been in running phase. + PodGroupRunning PodGroupPhase = "Running" + + // PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not + // be scheduled, e.g. not enough resource; scheduler will wait for related controller to recover it. + PodGroupUnknown PodGroupPhase = "Unknown" + + // PodGroupInqueue means controllers can start to create pods, + // is a new state between PodGroupPending and PodGroupRunning + PodGroupInqueue PodGroupPhase = "Inqueue" +) + +// PodGroupCondition contains details for the current state of this pod group. +type PodGroupCondition struct { + // Type is the type of the condition + Type PodGroupConditionType `json:"type,omitempty" protobuf:"bytes,1,opt,name=type"` + + // Status is the status of the condition. + Status v1.ConditionStatus `json:"status,omitempty" protobuf:"bytes,2,opt,name=status"` + + // The ID of condition transition. + TransitionID string `json:"transitionID,omitempty" protobuf:"bytes,3,opt,name=transitionID"` + + // Last time the phase transitioned from another to current phase. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty" protobuf:"bytes,4,opt,name=lastTransitionTime"` + + // Unique, one-word, CamelCase reason for the phase's last transition. + // +optional + Reason string `json:"reason,omitempty" protobuf:"bytes,5,opt,name=reason"` + + // Human-readable message indicating details about last transition. + // +optional + Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"` +} + +// PodGroup is a collection of Pod; used for batch workload. +type PodGroup struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata + // +optional + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + // Specification of the desired behavior of the pod group. + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status + // +optional + Spec PodGroupSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` + + // Status represents the current information about a pod group. + // This data may not be up to date. + // +optional + Status PodGroupStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` + + //Version represents the version of PodGroup + Version string +} + +// PodGroupSpec represents the template of a pod group. +type PodGroupSpec struct { + // MinMember defines the minimal number of members/tasks to run the pod group; + // if there's not enough resources to start all tasks, the scheduler + // will not start anyone. + MinMember int32 `json:"minMember,omitempty" protobuf:"bytes,1,opt,name=minMember"` + + // Queue defines the queue to allocate resource for PodGroup; if queue does not exist, + // the PodGroup will not be scheduled. + Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"` + + // If specified, indicates the PodGroup's priority. "system-node-critical" and + // "system-cluster-critical" are two special keywords which indicate the + // highest priorities with the former being the highest priority. Any other + // name must be defined by creating a PriorityClass object with that name. + // If not specified, the PodGroup priority will be default or zero if there is no + // default. + // +optional + PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"` + + // MinResources defines the minimal resource of members/tasks to run the pod group; + // if there's not enough resources to start all tasks, the scheduler + // will not start anyone. + MinResources *v1.ResourceList `json:"minResources,omitempty" protobuf:"bytes,4,opt,name=minResources"` +} + +// PodGroupStatus represents the current state of a pod group. +type PodGroupStatus struct { + // Current phase of PodGroup. + Phase PodGroupPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"` + + // The conditions of PodGroup. + // +optional + Conditions []PodGroupCondition `json:"conditions,omitempty" protobuf:"bytes,2,opt,name=conditions"` + + // The number of actively running pods. + // +optional + Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"` + + // The number of pods which reached phase Succeeded. + // +optional + Succeeded int32 `json:"succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"` + + // The number of pods which reached phase Failed. + // +optional + Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"` +} + +//ConvertPodGroupInfoToV1Alpha converts api.PodGroup type to v1alpha1.PodGroup +func ConvertPodGroupInfoToV1Alpha(pg *PodGroup) (*v1alpha1.PodGroup, error) { + marshalled, err := json.Marshal(*pg) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", pg.Name, err) + } + + convertedPg := &v1alpha1.PodGroup{} + err = json.Unmarshal(marshalled, convertedPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into v1alpha1.PodGroup type with error: %v", err) + } + + return convertedPg, nil +} + +//ConvertV1Alpha1ToPodGroupInfo converts v1alpha1.PodGroup to api.PodGroup type +func ConvertV1Alpha1ToPodGroupInfo(pg *v1alpha1.PodGroup) (*PodGroup, error) { + marshalled, err := json.Marshal(*pg) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", pg.Name, err) + } + + convertedPg := &PodGroup{} + err = json.Unmarshal(marshalled, convertedPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + convertedPg.Version = PodGroupVersionV1Alpha1 + + return convertedPg, nil +} + +//ConvertPodGroupInfoToV2Alpha converts api.PodGroup type to v1alpha2.PodGroup +func ConvertPodGroupInfoToV2Alpha(pg *PodGroup) (*v1alpha2.PodGroup, error) { + marshalled, err := json.Marshal(*pg) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", pg.Name, err) + } + + convertedPg := &v1alpha2.PodGroup{} + err = json.Unmarshal(marshalled, convertedPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into v1alpha2.PodGroup type with error: %v", err) + } + + return convertedPg, nil +} + +//ConvertV1Alpha2ToPodGroupInfo converts v1alpha2.PodGroup to api.PodGroup type +func ConvertV1Alpha2ToPodGroupInfo(pg *v1alpha2.PodGroup) (*PodGroup, error) { + marshalled, err := json.Marshal(*pg) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", pg.Name, err) + } + + convertedPg := &PodGroup{} + err = json.Unmarshal(marshalled, convertedPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + convertedPg.Version = PodGroupVersionV1Alpha2 + + return convertedPg, nil +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 589c42f2c..14494b525 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -50,6 +50,7 @@ import ( kbschema "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions" kbinfov1 "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha1" + kbinfov2 "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" "volcano.sh/volcano/pkg/scheduler/api" kbapi "volcano.sh/volcano/pkg/scheduler/api" ) @@ -78,16 +79,17 @@ type SchedulerCache struct { // schedulerName is the name for kube batch scheduler schedulerName string - podInformer infov1.PodInformer - nodeInformer infov1.NodeInformer - pdbInformer policyv1.PodDisruptionBudgetInformer - nsInformer infov1.NamespaceInformer - podGroupInformer kbinfov1.PodGroupInformer - queueInformer kbinfov1.QueueInformer - pvInformer infov1.PersistentVolumeInformer - pvcInformer infov1.PersistentVolumeClaimInformer - scInformer storagev1.StorageClassInformer - pcInformer schedv1.PriorityClassInformer + podInformer infov1.PodInformer + nodeInformer infov1.NodeInformer + pdbInformer policyv1.PodDisruptionBudgetInformer + nsInformer infov1.NamespaceInformer + podGroupInformerv1alpha1 kbinfov1.PodGroupInformer + podGroupInformerv1alpha2 kbinfov2.PodGroupInformer + queueInformer kbinfov1.QueueInformer + pvInformer infov1.PersistentVolumeInformer + pvcInformer infov1.PersistentVolumeClaimInformer + scInformer storagev1.StorageClassInformer + pcInformer schedv1.PriorityClassInformer Binder Binder Evictor Evictor @@ -182,8 +184,41 @@ func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.Po } // UpdatePodGroup will Update pod with podCondition -func (su *defaultStatusUpdater) UpdatePodGroup(pg *v1alpha1.PodGroup) (*v1alpha1.PodGroup, error) { - return su.kbclient.SchedulingV1alpha1().PodGroups(pg.Namespace).Update(pg) +func (su *defaultStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) { + if pg.Version == api.PodGroupVersionV1Alpha1 { + podGroup, err := api.ConvertPodGroupInfoToV1Alpha(pg) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) + } + updated, err := su.kbclient.SchedulingV1alpha1().PodGroups(podGroup.Namespace).Update(podGroup) + if err != nil { + glog.Errorf("Error while updating podgroup with error: %v", err) + } + podGroupInfo, err := api.ConvertV1Alpha1ToPodGroupInfo(updated) + if err != nil { + glog.Errorf("Error While converting v1alpha.Podgroup to api.PodGroup with error: %v", err) + return nil, err + } + return podGroupInfo, nil + } + + if pg.Version == api.PodGroupVersionV1Alpha2 { + podGroup, err := api.ConvertPodGroupInfoToV2Alpha(pg) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err) + } + updated, err := su.kbclient.SchedulingV1alpha2().PodGroups(podGroup.Namespace).Update(podGroup) + if err != nil { + glog.Errorf("Error while updating podgroup with error: %v", err) + } + podGroupInfo, err := api.ConvertV1Alpha2ToPodGroupInfo(updated) + if err != nil { + glog.Errorf("Error While converting v2alpha.Podgroup to api.PodGroup with error: %v", err) + return nil, err + } + return podGroupInfo, nil + } + return nil, fmt.Errorf("Provide Proper version of PodGroup, Invalid PodGroup version: %s", pg.Version) } type defaultVolumeBinder struct { @@ -319,12 +354,20 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s }) kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0) - // create informer for PodGroup information - sc.podGroupInformer = kbinformer.Scheduling().V1alpha1().PodGroups() - sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddPodGroup, - UpdateFunc: sc.UpdatePodGroup, - DeleteFunc: sc.DeletePodGroup, + // create informer for PodGroup(v1alpha1) information + sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups() + sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddPodGroupAlpha1, + UpdateFunc: sc.UpdatePodGroupAlpha1, + DeleteFunc: sc.DeletePodGroupAlpha1, + }) + + // create informer for PodGroup(v1alpha2) information + sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups() + sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddPodGroupAlpha2, + UpdateFunc: sc.UpdatePodGroupAlpha2, + DeleteFunc: sc.DeletePodGroupAlpha2, }) // create informer for Queue information @@ -343,7 +386,8 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.pdbInformer.Informer().Run(stopCh) go sc.podInformer.Informer().Run(stopCh) go sc.nodeInformer.Informer().Run(stopCh) - go sc.podGroupInformer.Informer().Run(stopCh) + go sc.podGroupInformerv1alpha1.Informer().Run(stopCh) + go sc.podGroupInformerv1alpha2.Informer().Run(stopCh) go sc.pvInformer.Informer().Run(stopCh) go sc.pvcInformer.Informer().Run(stopCh) go sc.scInformer.Informer().Run(stopCh) @@ -368,7 +412,8 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { informerSynced := []cache.InformerSynced{ sc.pdbInformer.Informer().HasSynced, sc.podInformer.Informer().HasSynced, - sc.podGroupInformer.Informer().HasSynced, + sc.podGroupInformerv1alpha1.Informer().HasSynced, + sc.podGroupInformerv1alpha2.Informer().HasSynced, sc.nodeInformer.Informer().HasSynced, sc.pvInformer.Informer().HasSynced, sc.pvcInformer.Informer().HasSynced, @@ -437,7 +482,23 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { }() if !shadowPodGroup(job.PodGroup) { - sc.Recorder.Eventf(job.PodGroup, v1.EventTypeNormal, "Evict", reason) + if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { + pg, err := api.ConvertPodGroupInfoToV1Alpha(job.PodGroup) + if err != nil { + glog.Errorf("Error While converting api.PodGroup to v1alpha.PodGroup with error: %v", err) + return err + } + sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) + } else if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { + pg, err := api.ConvertPodGroupInfoToV2Alpha(job.PodGroup) + if err != nil { + glog.Errorf("Error While converting api.PodGroup to v2alpha.PodGroup with error: %v", err) + return err + } + sc.Recorder.Eventf(pg, v1.EventTypeNormal, "Evict", reason) + } else { + return fmt.Errorf("Invalid PodGroup Version: %s", job.PodGroup.Version) + } } return nil @@ -693,14 +754,30 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { if !shadowPodGroup(job.PodGroup) { pgUnschedulable := job.PodGroup != nil && - (job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown || - job.PodGroup.Status.Phase == v1alpha1.PodGroupPending) + (job.PodGroup.Status.Phase == api.PodGroupUnknown || + job.PodGroup.Status.Phase == api.PodGroupPending) pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0 // If pending or unschedulable, record unschedulable event. if pgUnschedulable || pdbUnschedulabe { - sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning, - string(v1alpha1.PodGroupUnschedulableType), baseErrorMessage) + msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError()) + if job.PodGroup.Version == api.PodGroupVersionV1Alpha1 { + podGroup, err := api.ConvertPodGroupInfoToV1Alpha(job.PodGroup) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) + } + sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, + string(v1alpha1.PodGroupUnschedulableType), msg) + } + + if job.PodGroup.Version == api.PodGroupVersionV1Alpha2 { + podGroup, err := api.ConvertPodGroupInfoToV2Alpha(job.PodGroup) + if err != nil { + glog.Errorf("Error while converting PodGroup to v1alpha2.PodGroup with error: %v", err) + } + sc.Recorder.Eventf(podGroup, v1.EventTypeWarning, + string(v1alpha1.PodGroupUnschedulableType), msg) + } } } diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 9d8207667..f28cee293 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "encoding/json" "fmt" "github.com/golang/glog" @@ -29,6 +30,7 @@ import ( "k8s.io/client-go/tools/cache" kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + kbv2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/apis/utils" kbapi "volcano.sh/volcano/pkg/scheduler/api" ) @@ -359,12 +361,12 @@ func (sc *SchedulerCache) DeleteNode(obj interface{}) { return } -func getJobID(pg *kbv1.PodGroup) kbapi.JobID { +func getJobID(pg *kbapi.PodGroup) kbapi.JobID { return kbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name)) } // Assumes that lock is already acquired. -func (sc *SchedulerCache) setPodGroup(ss *kbv1.PodGroup) error { +func (sc *SchedulerCache) setPodGroup(ss *kbapi.PodGroup) error { job := getJobID(ss) if len(job) == 0 { @@ -386,12 +388,12 @@ func (sc *SchedulerCache) setPodGroup(ss *kbv1.PodGroup) error { } // Assumes that lock is already acquired. -func (sc *SchedulerCache) updatePodGroup(oldQueue, newQueue *kbv1.PodGroup) error { +func (sc *SchedulerCache) updatePodGroup(oldQueue, newQueue *kbapi.PodGroup) error { return sc.setPodGroup(newQueue) } // Assumes that lock is already acquired. -func (sc *SchedulerCache) deletePodGroup(ss *kbv1.PodGroup) error { +func (sc *SchedulerCache) deletePodGroup(ss *kbapi.PodGroup) error { jobID := getJobID(ss) job, found := sc.Jobs[jobID] @@ -407,19 +409,32 @@ func (sc *SchedulerCache) deletePodGroup(ss *kbv1.PodGroup) error { return nil } -// AddPodGroup add podgroup to scheduler cache -func (sc *SchedulerCache) AddPodGroup(obj interface{}) { +// AddPodGroupAlpha1 add podgroup to scheduler cache +func (sc *SchedulerCache) AddPodGroupAlpha1(obj interface{}) { ss, ok := obj.(*kbv1.PodGroup) if !ok { glog.Errorf("Cannot convert to *kbv1.PodGroup: %v", obj) return } + marshalled, err := json.Marshal(*ss) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err) + } + + pg := &kbapi.PodGroup{} + err = json.Unmarshal(marshalled, pg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + pg.Version = kbapi.PodGroupVersionV1Alpha1 + sc.Mutex.Lock() defer sc.Mutex.Unlock() glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec) - err := sc.setPodGroup(ss) + + err = sc.setPodGroup(pg) if err != nil { glog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err) return @@ -427,8 +442,41 @@ func (sc *SchedulerCache) AddPodGroup(obj interface{}) { return } -// UpdatePodGroup add podgroup to scheduler cache -func (sc *SchedulerCache) UpdatePodGroup(oldObj, newObj interface{}) { +// AddPodGroupAlpha2 add podgroup to scheduler cache +func (sc *SchedulerCache) AddPodGroupAlpha2(obj interface{}) { + ss, ok := obj.(*kbv2.PodGroup) + if !ok { + glog.Errorf("Cannot convert to *kbv2.PodGroup: %v", obj) + return + } + + marshalled, err := json.Marshal(*ss) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err) + } + + pg := &kbapi.PodGroup{} + err = json.Unmarshal(marshalled, pg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + pg.Version = kbapi.PodGroupVersionV1Alpha2 + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec) + + err = sc.setPodGroup(pg) + if err != nil { + glog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err) + return + } + return +} + +// UpdatePodGroupAlpha1 add podgroup to scheduler cache +func (sc *SchedulerCache) UpdatePodGroupAlpha1(oldObj, newObj interface{}) { oldSS, ok := oldObj.(*kbv1.PodGroup) if !ok { glog.Errorf("Cannot convert oldObj to *kbv1.SchedulingSpec: %v", oldObj) @@ -440,10 +488,35 @@ func (sc *SchedulerCache) UpdatePodGroup(oldObj, newObj interface{}) { return } + oldMarshalled, err := json.Marshal(*oldSS) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", oldSS.Name, err) + } + + oldPg := &kbapi.PodGroup{} + oldPg.Version = kbapi.PodGroupVersionV1Alpha1 + + err = json.Unmarshal(oldMarshalled, oldPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + + newMarshalled, err := json.Marshal(*newSS) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", newSS.Name, err) + } + + newPg := &kbapi.PodGroup{} + newPg.Version = kbapi.PodGroupVersionV1Alpha1 + + err = json.Unmarshal(newMarshalled, newPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } sc.Mutex.Lock() defer sc.Mutex.Unlock() - err := sc.updatePodGroup(oldSS, newSS) + err = sc.updatePodGroup(oldPg, newPg) if err != nil { glog.Errorf("Failed to update SchedulingSpec %s into cache: %v", oldSS.Name, err) return @@ -451,8 +524,58 @@ func (sc *SchedulerCache) UpdatePodGroup(oldObj, newObj interface{}) { return } -// DeletePodGroup delete podgroup from scheduler cache -func (sc *SchedulerCache) DeletePodGroup(obj interface{}) { +// UpdatePodGroupAlpha2 add podgroup to scheduler cache +func (sc *SchedulerCache) UpdatePodGroupAlpha2(oldObj, newObj interface{}) { + oldSS, ok := oldObj.(*kbv2.PodGroup) + if !ok { + glog.Errorf("Cannot convert oldObj to *kbv2.SchedulingSpec: %v", oldObj) + return + } + newSS, ok := newObj.(*kbv2.PodGroup) + if !ok { + glog.Errorf("Cannot convert newObj to *kbv2.SchedulingSpec: %v", newObj) + return + } + + oldMarshalled, err := json.Marshal(*oldSS) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", oldSS.Name, err) + } + + oldPg := &kbapi.PodGroup{} + oldPg.Version = kbapi.PodGroupVersionV1Alpha2 + + err = json.Unmarshal(oldMarshalled, oldPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + + newMarshalled, err := json.Marshal(*newSS) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", newSS.Name, err) + } + + newPg := &kbapi.PodGroup{} + newPg.Version = kbapi.PodGroupVersionV1Alpha2 + + err = json.Unmarshal(newMarshalled, newPg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + err = sc.updatePodGroup(oldPg, newPg) + if err != nil { + glog.Errorf("Failed to update SchedulingSpec %s into cache: %v", oldSS.Name, err) + return + } + return +} + +// DeletePodGroupAlpha1 delete podgroup from scheduler cache +func (sc *SchedulerCache) DeletePodGroupAlpha1(obj interface{}) { var ss *kbv1.PodGroup switch t := obj.(type) { case *kbv1.PodGroup: @@ -469,10 +592,63 @@ func (sc *SchedulerCache) DeletePodGroup(obj interface{}) { return } + marshalled, err := json.Marshal(*ss) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err) + } + + pg := &kbapi.PodGroup{} + pg.Version = kbapi.PodGroupVersionV1Alpha1 + err = json.Unmarshal(marshalled, pg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + sc.Mutex.Lock() defer sc.Mutex.Unlock() - err := sc.deletePodGroup(ss) + err = sc.deletePodGroup(pg) + if err != nil { + glog.Errorf("Failed to delete SchedulingSpec %s from cache: %v", ss.Name, err) + return + } + return +} + +// DeletePodGroupAlpha2 delete podgroup from scheduler cache +func (sc *SchedulerCache) DeletePodGroupAlpha2(obj interface{}) { + var ss *kbv2.PodGroup + switch t := obj.(type) { + case *kbv2.PodGroup: + ss = t + case cache.DeletedFinalStateUnknown: + var ok bool + ss, ok = t.Obj.(*kbv2.PodGroup) + if !ok { + glog.Errorf("Cannot convert to *kbv2.SchedulingSpec: %v", t.Obj) + return + } + default: + glog.Errorf("Cannot convert to *kbv2.SchedulingSpec: %v", t) + return + } + + marshalled, err := json.Marshal(*ss) + if err != nil { + glog.Errorf("Failed to Marshal podgroup %s with error: %v", ss.Name, err) + } + + pg := &kbapi.PodGroup{} + pg.Version = kbapi.PodGroupVersionV1Alpha2 + err = json.Unmarshal(marshalled, pg) + if err != nil { + glog.Errorf("Failed to Unmarshal Data into api.PodGroup type with error: %v", err) + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + err = sc.deletePodGroup(pg) if err != nil { glog.Errorf("Failed to delete SchedulingSpec %s from cache: %v", ss.Name, err) return diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index c68cbcbd9..c07e083a5 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -18,7 +18,6 @@ package cache import ( v1 "k8s.io/api/core/v1" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -74,5 +73,5 @@ type Evictor interface { // StatusUpdater updates pod with given PodCondition type StatusUpdater interface { UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) - UpdatePodGroup(pg *v1alpha1.PodGroup) (*v1alpha1.PodGroup, error) + UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) } diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go index 554b90f4b..2e1340d74 100644 --- a/pkg/scheduler/cache/util.go +++ b/pkg/scheduler/cache/util.go @@ -20,7 +20,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/apis/utils" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -29,7 +28,7 @@ const ( shadowPodGroupKey = "volcano/shadow-pod-group" ) -func shadowPodGroup(pg *v1alpha1.PodGroup) bool { +func shadowPodGroup(pg *api.PodGroup) bool { if pg == nil { return true } @@ -39,13 +38,13 @@ func shadowPodGroup(pg *v1alpha1.PodGroup) bool { return found } -func createShadowPodGroup(pod *v1.Pod) *v1alpha1.PodGroup { +func createShadowPodGroup(pod *v1.Pod) *api.PodGroup { jobID := api.JobID(utils.GetController(pod)) if len(jobID) == 0 { jobID = api.JobID(pod.UID) } - return &v1alpha1.PodGroup{ + return &api.PodGroup{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: string(jobID), @@ -53,7 +52,7 @@ func createShadowPodGroup(pod *v1.Pod) *v1alpha1.PodGroup { shadowPodGroupKey: string(jobID), }, }, - Spec: v1alpha1.PodGroupSpec{ + Spec: api.PodGroupSpec{ MinMember: 1, }, } diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index ea361c6a3..8d5b826e4 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -10,7 +10,6 @@ import ( "k8s.io/client-go/util/workqueue" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -52,7 +51,7 @@ func (ju *jobUpdater) UpdateAll() { workqueue.ParallelizeUntil(context.TODO(), jobUpdaterWorker, len(ju.jobQueue), ju.updateJob) } -func isPodGroupConditionsUpdated(newCondition, oldCondition []v1alpha1.PodGroupCondition) bool { +func isPodGroupConditionsUpdated(newCondition, oldCondition []api.PodGroupCondition) bool { if len(newCondition) != len(oldCondition) { return true } @@ -85,7 +84,7 @@ func isPodGroupConditionsUpdated(newCondition, oldCondition []v1alpha1.PodGroupC return false } -func isPodGroupStatusUpdated(newStatus, oldStatus *v1alpha1.PodGroupStatus) bool { +func isPodGroupStatusUpdated(newStatus, oldStatus *api.PodGroupStatus) bool { newCondition := newStatus.Conditions newStatus.Conditions = nil oldCondition := oldStatus.Conditions diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 61d5fa84f..25b7a96fe 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" - "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" @@ -39,7 +38,7 @@ type Session struct { cache cache.Cache - podGroupStatus map[api.JobID]*v1alpha1.PodGroupStatus + podGroupStatus map[api.JobID]*api.PodGroupStatus Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo @@ -71,7 +70,7 @@ func openSession(cache cache.Cache) *Session { UID: uuid.NewUUID(), cache: cache, - podGroupStatus: map[api.JobID]*v1alpha1.PodGroupStatus{}, + podGroupStatus: map[api.JobID]*api.PodGroupStatus{}, Jobs: map[api.JobID]*api.JobInfo{}, Nodes: map[string]*api.NodeInfo{}, @@ -101,13 +100,13 @@ func openSession(cache cache.Cache) *Session { for _, job := range ssn.Jobs { // only conditions will be updated periodically if job.PodGroup != nil && job.PodGroup.Status.Conditions != nil { - ssn.podGroupStatus[job.UID] = job.PodGroup.Status.DeepCopy() + ssn.podGroupStatus[job.UID] = &job.PodGroup.Status } if vjr := ssn.JobValid(job); vjr != nil { if !vjr.Pass { - jc := &v1alpha1.PodGroupCondition{ - Type: v1alpha1.PodGroupUnschedulableType, + jc := &api.PodGroupCondition{ + Type: api.PodGroupUnschedulableType, Status: v1.ConditionTrue, LastTransitionTime: metav1.Now(), TransitionID: string(ssn.UID), @@ -148,12 +147,12 @@ func closeSession(ssn *Session) { glog.V(3).Infof("Close Session %v", ssn.UID) } -func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus { +func jobStatus(ssn *Session, jobInfo *api.JobInfo) api.PodGroupStatus { status := jobInfo.PodGroup.Status unschedulable := false for _, c := range status.Conditions { - if c.Type == v1alpha1.PodGroupUnschedulableType && + if c.Type == api.PodGroupUnschedulableType && c.Status == v1.ConditionTrue && c.TransitionID == string(ssn.UID) { @@ -164,7 +163,7 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus { // If running tasks && unschedulable, unknown phase if len(jobInfo.TaskStatusIndex[api.Running]) != 0 && unschedulable { - status.Phase = v1alpha1.PodGroupUnknown + status.Phase = api.PodGroupUnknown } else { allocated := 0 for status, tasks := range jobInfo.TaskStatusIndex { @@ -175,9 +174,9 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus { // If there're enough allocated resource, it's running if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember { - status.Phase = v1alpha1.PodGroupRunning - } else if jobInfo.PodGroup.Status.Phase != v1alpha1.PodGroupInqueue { - status.Phase = v1alpha1.PodGroupPending + status.Phase = api.PodGroupRunning + } else if jobInfo.PodGroup.Status.Phase != api.PodGroupInqueue { + status.Phase = api.PodGroupPending } } @@ -363,7 +362,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { } // UpdateJobCondition update job condition accordingly. -func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *v1alpha1.PodGroupCondition) error { +func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *api.PodGroupCondition) error { job, ok := ssn.Jobs[jobInfo.UID] if !ok { return fmt.Errorf("failed to find job <%s/%s>", jobInfo.Namespace, jobInfo.Name) diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index a29cc8072..77fa4526e 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -146,8 +146,8 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount)) metrics.RegisterJobRetries(job.Name) - jc := &v1alpha1.PodGroupCondition{ - Type: v1alpha1.PodGroupUnschedulableType, + jc := &api.PodGroupCondition{ + Type: api.PodGroupUnschedulableType, Status: v1.ConditionTrue, LastTransitionTime: metav1.Now(), TransitionID: string(ssn.UID), diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 8cc42a5a0..8e4f6f7bc 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -143,7 +143,7 @@ func (ftsu *FakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1. } // UpdatePodGroup is a empty function -func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *kbv1.PodGroup) (*kbv1.PodGroup, error) { +func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) { // do nothing here return nil, nil }