Merge pull request #210 from jinzhejz/bugfix
Bug fix: update PDB for QueueJob when its Replicas is changed
This commit is contained in:
commit
2018482fc1
|
|
@ -80,6 +80,9 @@ type Controller struct {
|
|||
// this is used to avoid to re-create the pods of a QueueJob before
|
||||
// all the old pods are terminated
|
||||
deletedPodsCounter *syncCounterMap
|
||||
|
||||
// A map that store PDB name for each QueueJob
|
||||
queueJobToPDB map[string]string
|
||||
}
|
||||
|
||||
// NewController create new QueueJob Controller
|
||||
|
|
@ -91,6 +94,7 @@ func NewController(config *rest.Config) *Controller {
|
|||
initQueue: cache.NewFIFO(queueJobKey),
|
||||
updateQueue: cache.NewFIFO(queueJobKey),
|
||||
deletedPodsCounter: newSyncCounterMap(),
|
||||
queueJobToPDB: make(map[string]string),
|
||||
}
|
||||
|
||||
queueJobClient, _, err := client.NewClient(cc.config)
|
||||
|
|
@ -163,7 +167,7 @@ func (cc *Controller) addQueueJob(obj interface{}) {
|
|||
}
|
||||
|
||||
func (cc *Controller) updateQueueJob(oldObj, newObj interface{}) {
|
||||
_, ok := oldObj.(*arbv1.QueueJob)
|
||||
oldQJ, ok := oldObj.(*arbv1.QueueJob)
|
||||
if !ok {
|
||||
glog.Errorf("oldObj is not QueueJob")
|
||||
return
|
||||
|
|
@ -174,6 +178,35 @@ func (cc *Controller) updateQueueJob(oldObj, newObj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
if _, ok := cc.queueJobToPDB[newQJ.Name]; !ok {
|
||||
// create PDB for the QueueJob if controller doesn't create it before
|
||||
pdb, err := cc.initPDB(int(newQJ.Spec.Replicas), newQJ.Spec.Template.Labels)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create PDB for QueueJob %s, err %#v", newQJ.Name, err)
|
||||
return
|
||||
}
|
||||
cc.queueJobToPDB[newQJ.Name] = pdb
|
||||
}
|
||||
|
||||
if oldQJ.Spec.Replicas != newQJ.Spec.Replicas {
|
||||
// update PDB for queuejob to support gang scheduling if its Replicas is changed
|
||||
// 1. delete old pdb
|
||||
oldPDB, ok := cc.queueJobToPDB[newQJ.Name]
|
||||
if ok {
|
||||
err := cc.clients.Policy().PodDisruptionBudgets("default").Delete(oldPDB, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete PDB for QueueJob %s, err %#v", newQJ.Name, err)
|
||||
}
|
||||
}
|
||||
// 2. create a new pdb
|
||||
newPDB, err := cc.initPDB(int(newQJ.Spec.Replicas), newQJ.Spec.Template.Labels)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create PDB for QueueJob %s, err %#v", newQJ.Name, err)
|
||||
return
|
||||
}
|
||||
cc.queueJobToPDB[newQJ.Name] = newPDB
|
||||
}
|
||||
|
||||
cc.enqueueUpdateQueue(newQJ)
|
||||
}
|
||||
|
||||
|
|
@ -235,13 +268,6 @@ func (cc *Controller) initWorker() {
|
|||
return
|
||||
}
|
||||
|
||||
// create PDB for QueueJob to support gang-scheduling
|
||||
err = cc.initPDB(int(queuejob.Spec.Replicas), queuejob.Spec.Template.Labels)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create PDB for QueueJob %s, err %#v", queuejob.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Add labels and selectors which are used by controller for a QueueJob
|
||||
// And update to api server
|
||||
err = cc.initLabelsForQueueJob(queuejob)
|
||||
|
|
@ -317,14 +343,15 @@ func (cc *Controller) updateWorker() {
|
|||
}
|
||||
}
|
||||
|
||||
func (cc *Controller) initPDB(min int, selectorMap map[string]string) error {
|
||||
func (cc *Controller) initPDB(min int, selectorMap map[string]string) (string, error) {
|
||||
pdbName := fmt.Sprintf("pdb-%s", generateUUID())
|
||||
selector := &metav1.LabelSelector{
|
||||
MatchLabels: selectorMap,
|
||||
}
|
||||
minAvailable := intstr.FromInt(min)
|
||||
pdb := &v1beta1.PodDisruptionBudget{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pdb-%s", generateUUID()),
|
||||
Name: pdbName,
|
||||
},
|
||||
Spec: v1beta1.PodDisruptionBudgetSpec{
|
||||
Selector: selector,
|
||||
|
|
@ -334,7 +361,7 @@ func (cc *Controller) initPDB(min int, selectorMap map[string]string) error {
|
|||
|
||||
_, err := cc.clients.Policy().PodDisruptionBudgets("default").Create(pdb)
|
||||
|
||||
return err
|
||||
return pdbName, err
|
||||
}
|
||||
|
||||
func (cc *Controller) syncQueueJob(qj *arbv1.QueueJob) (bool, error) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue