Merge pull request #60 from jinzhejz/jinzhejz_preemption

Init resources preemption between queues
This commit is contained in:
Klaus Ma 2017-10-18 15:59:21 +08:00 committed by GitHub
commit 4ac40bc38b
9 changed files with 628 additions and 30 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/kubernetes-incubator/kube-arbitrator/cmd/kube-arbitrator/app/options"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/controller"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy/preemption"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/schedulercache"
"k8s.io/client-go/rest"
@ -44,7 +45,7 @@ func Run(opt *options.ServerOption) error {
go cache.Run(neverStop)
// TODO dump cache information and do something
c := controller.NewQueueController(config, cache, policy.New(opt.Policy))
c := controller.NewQueueController(config, cache, policy.New(opt.Policy), preemption.New(config))
c.Run()
<-neverStop

View File

@ -19,10 +19,8 @@ package controller
import (
"time"
"github.com/golang/glog"
apiv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/client"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy/preemption"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/schedulercache"
"k8s.io/apimachinery/pkg/util/wait"
@ -33,14 +31,16 @@ type QueueController struct {
config *rest.Config
cache schedulercache.Cache
allocator policy.Interface
preemptor preemption.Interface
quotaManager *quotaManager
}
func NewQueueController(config *rest.Config, cache schedulercache.Cache, allocator policy.Interface) *QueueController {
func NewQueueController(config *rest.Config, cache schedulercache.Cache, allocator policy.Interface, preemptor preemption.Interface) *QueueController {
queueController := &QueueController{
config: config,
cache: cache,
allocator: allocator,
preemptor: preemptor,
quotaManager: &quotaManager{
config: config,
},
@ -51,6 +51,7 @@ func NewQueueController(config *rest.Config, cache schedulercache.Cache, allocat
func (q *QueueController) Run() {
go q.quotaManager.Run()
go q.preemptor.Run(wait.NeverStop)
go wait.Until(q.runOnce, 2*time.Second, wait.NeverStop)
}
@ -59,26 +60,6 @@ func (q *QueueController) runOnce() {
jobGroups := q.allocator.Group(snapshot.Queues)
queues := q.allocator.Allocate(jobGroups, snapshot.Nodes)
q.updateQueues(queues)
}
func (q *QueueController) updateQueues(queues map[string]*schedulercache.QueueInfo) {
queueClient, _, err := client.NewClient(q.config)
if err != nil {
glog.Error("fail to create queue client")
return
}
for _, queue := range queues {
result := apiv1.Queue{}
err = queueClient.Put().
Resource(apiv1.QueuePlural).
Namespace(queue.Queue().Namespace).
Name(queue.Queue().Name).
Body(queue.Queue()).
Do().Into(&result)
if err != nil {
glog.Errorf("fail to update queue info, name %s, %#v", queue.Queue().Name, err)
}
}
queuesForPreempt, _ := q.preemptor.Preprocessing(queues, snapshot.Pods)
q.preemptor.PreemptResources(queuesForPreempt)
}

View File

@ -78,14 +78,14 @@ func (qm *quotaManager) updateQuotas(queues []apiv1.Queue) {
}
updatedRq := rqList.Items[0].DeepCopy()
if cpuQuantity, ok := queue.Status.Deserved.Resources["cpu"]; ok {
if cpuQuantity, ok := queue.Status.Allocated.Resources["cpu"]; ok {
if cpu, ok := (&cpuQuantity).AsInt64(); ok {
cpuQuota := *resource.NewQuantity(cpu, resource.DecimalSI)
updatedRq.Spec.Hard["limits.cpu"] = cpuQuota
updatedRq.Spec.Hard["requests.cpu"] = cpuQuota
}
}
if memoryQuantity, ok := queue.Status.Deserved.Resources["memory"]; ok {
if memoryQuantity, ok := queue.Status.Allocated.Resources["memory"]; ok {
if memory, ok := (&memoryQuantity).AsInt64(); ok {
memoryQuota := *resource.NewQuantity(memory, resource.BinarySI)
updatedRq.Spec.Hard["limits.memory"] = memoryQuota

View File

@ -0,0 +1,33 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
"github.com/kubernetes-incubator/kube-arbitrator/pkg/schedulercache"
)
// Interface is the interface of preemption.
type Interface interface {
// Run start informer
Run(stopCh <-chan struct{})
// Preprocessing kill pod to make each queue underused
Preprocessing(queues map[string]*schedulercache.QueueInfo, pods []*schedulercache.PodInfo) (map[string]*schedulercache.QueueInfo, error)
// PreemptResource preempt resources between job
PreemptResources(queues map[string]*schedulercache.QueueInfo) error
}

View File

@ -0,0 +1,522 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
"strings"
"sync"
"github.com/golang/glog"
apiv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/client"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/schedulercache"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type Resources map[apiv1.ResourceName]resource.Quantity
type preemptedPodInfo struct {
pod *v1.Pod
totalReleasingResources Resources
detailReleasingResources map[string]Resources
}
type basePreemption struct {
dataMu *sync.Mutex
updateMu *sync.Mutex
dataCond *sync.Cond
name string
config *rest.Config
client *kubernetes.Clientset
terminatingPodsForPreempt map[string]preemptedPodInfo
terminatingPodsForUnderused map[string]*v1.Pod
}
func New(config *rest.Config) Interface {
return newBasePreemption("base-preemption", config)
}
func newBasePreemption(name string, config *rest.Config) *basePreemption {
bp := &basePreemption{
dataMu: new(sync.Mutex),
updateMu: new(sync.Mutex),
name: name,
config: config,
client: kubernetes.NewForConfigOrDie(config),
terminatingPodsForPreempt: make(map[string]preemptedPodInfo),
terminatingPodsForUnderused: make(map[string]*v1.Pod),
}
bp.dataCond = sync.NewCond(bp.dataMu)
return bp
}
func calculatePodResources(pod *v1.Pod) map[string]resource.Quantity {
totalResource := make(map[string]resource.Quantity)
for _, container := range pod.Spec.Containers {
for k, v := range container.Resources.Requests {
// only handle cpu/memory resource now
if k.String() != "cpu" && k.String() != "memory" {
continue
}
if _, ok := totalResource[k.String()]; ok {
result := totalResource[k.String()].DeepCopy()
result.Add(v)
totalResource[k.String()] = result
} else {
totalResource[k.String()] = v
}
}
}
return totalResource
}
func killPod(client *kubernetes.Clientset, pod *v1.Pod) error {
err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &meta_v1.DeleteOptions{})
return err
}
// -1 - if res1 < res2
// 0 - if res1 = res2
// 1 - if not belong above cases
func compareResources(res1 map[apiv1.ResourceName]resource.Quantity, res2 map[apiv1.ResourceName]resource.Quantity) int {
cpu1 := res1["cpu"].DeepCopy()
cpu2 := res2["cpu"].DeepCopy()
memory1 := res1["memory"].DeepCopy()
memory2 := res2["memory"].DeepCopy()
if cpu1.Cmp(cpu2) < 0 && memory1.Cmp(memory2) < 0 {
return -1
} else if cpu1.Cmp(cpu2) == 0 && memory1.Cmp(memory2) == 0 {
return 0
} else {
return 1
}
}
func (p *basePreemption) startPodInformer(stopCh <-chan struct{}) {
informerFactory := informers.NewSharedInformerFactory(p.client, 0)
podInformer := informerFactory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
glog.V(4).Infof("filter pod name(%s) namespace(%s) status(%s)\n", t.Name, t.Namespace, t.Status.Phase)
return true
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
DeleteFunc: p.terminatePodDone,
},
})
go podInformer.Informer().Run(stopCh)
}
func (p *basePreemption) Run(stopCh <-chan struct{}) {
p.startPodInformer(stopCh)
}
func (p *basePreemption) Preprocessing(queues map[string]*schedulercache.QueueInfo, pods []*schedulercache.PodInfo) (map[string]*schedulercache.QueueInfo, error) {
// kill running pod for each queue to make used < allocated
p.dataMu.Lock()
defer p.dataMu.Unlock()
// calculate used resources for each queue
for _, q := range queues {
for _, pod := range pods {
if strings.Compare(q.Queue().Namespace, pod.Pod().Namespace) != 0 {
continue
}
if _, ok := p.terminatingPodsForUnderused[pod.Name()]; ok {
continue
}
if _, ok := p.terminatingPodsForPreempt[pod.Name()]; ok {
continue
}
q.Pods[pod.Name()] = pod.Pod()
podResources := calculatePodResources(pod.Pod())
glog.V(4).Infof("Preprocessing() total resources for pod %s, %#v", pod.Name(), podResources)
for k, v := range podResources {
if k != "cpu" && k != "memory" {
continue
}
resType := apiv1.ResourceName(k)
if _, ok := q.Queue().Status.Used.Resources[resType]; ok {
result := q.Queue().Status.Used.Resources[resType].DeepCopy()
result.Add(v)
q.Queue().Status.Used.Resources[resType] = result
} else {
q.Queue().Status.Used.Resources[resType] = v
}
}
}
glog.V(4).Infof("Preprocessing calculate queue, queue %s, deserved (%#v), allocated (%#v), used (%#v), preempting (%#v)",
q.Name(), q.Queue().Status.Deserved.Resources, q.Queue().Status.Allocated.Resources,
q.Queue().Status.Used.Resources, q.Queue().Status.Preempting.Resources)
}
// kill pod to make queue Used <= Allocated
for _, q := range queues {
for k, pod := range q.Pods {
if q.UsedUnderAllocated() {
glog.V(4).Infof("Preprocessing queue %s is underused, used <= allocated, try next queue", q.Name())
break
}
// choose a pod to kill and check used <= allocated again
podResources := calculatePodResources(pod)
if err := killPod(p.client, pod); err == nil {
// kill successfully
delete(q.Pods, k)
p.terminatingPodsForUnderused[pod.Name] = pod
for k, v := range podResources {
if k != "cpu" && k != "memory" {
continue
}
resType := apiv1.ResourceName(k)
if _, ok := q.Queue().Status.Used.Resources[resType]; ok {
result := q.Queue().Status.Used.Resources[resType].DeepCopy()
result.Sub(v)
q.Queue().Status.Used.Resources[resType] = result
} else {
glog.Errorf("cannot find resource %s in queue used resource", k)
}
}
} else {
// TODO may need some error handling when kill pod failed
glog.Errorf("failed to kill pod %s", pod.Name)
}
}
glog.V(4).Infof("Preprocessing after kill pods, queue %s, deserved (%#v), allocated (%#v), used (%#v), preempting (%#v)",
q.Name(), q.Queue().Status.Deserved.Resources, q.Queue().Status.Allocated.Resources,
q.Queue().Status.Used.Resources, q.Queue().Status.Preempting.Resources)
}
return queues, nil
}
func (p *basePreemption) PreemptResources(queues map[string]*schedulercache.QueueInfo) error {
// Divided queues into three categories
// queuesOverused - Deserved < Allocated
// queuesPerfectused - Deserved = Allocated
// queuesUnderused - Deserved > Allocated
queuesOverused := make(map[string]*schedulercache.QueueInfo)
queuesPerfectused := make(map[string]*schedulercache.QueueInfo)
queuesUnderused := make(map[string]*schedulercache.QueueInfo)
for _, q := range queues {
result := compareResources(q.Queue().Status.Deserved.Resources, q.Queue().Status.Allocated.Resources)
if result == -1 {
queuesOverused[q.Name()] = q
} else if result == 0 {
queuesPerfectused[q.Name()] = q
} else if result == 1 {
queuesUnderused[q.Name()] = q
}
}
glog.V(4).Infof("PreemptResource after divided, queuesOverused(%d), queuesPerfectused(%d), queuesUnderused(%d)",
len(queuesOverused), len(queuesPerfectused), len(queuesUnderused))
// handler queuesOverused which will be preempted resources to other queue
preemptingPods := make(map[string]preemptedPodInfo)
for _, q := range queuesOverused {
result := compareResources(q.Queue().Status.Used.Resources, q.Queue().Status.Deserved.Resources)
if result <= 0 {
// Used <= Deserved
// update Allocated to Deserved directly
q.Queue().Status.Allocated.Resources = q.Queue().Status.Deserved.Resources
} else {
// Used > Deserved
// kill pod randomly to make Used <= Deserved
// after the pod is terminated, it will release some resource to other queues
for _, pod := range q.Pods {
// skip if Used <= Deserved
if q.UsedUnderDeserved() {
break
}
// released resource by the killed pod
// it may be not same as its occupied resources
releasingResources := make(map[apiv1.ResourceName]resource.Quantity)
// calculate releasing resources of pod
podResources := calculatePodResources(pod)
for k, res := range podResources {
if k != "cpu" && k != "memory" {
continue
}
resType := apiv1.ResourceName(k)
usedResource := q.Queue().Status.Used.Resources[resType].DeepCopy()
deservedResource := q.Queue().Status.Deserved.Resources[resType].DeepCopy()
if usedResource.Cmp(deservedResource) == 1 {
usedResource.Sub(deservedResource)
if usedResource.Cmp(res) <= 0 {
releasingResources[resType] = usedResource
} else {
releasingResources[resType] = res
}
}
result := q.Queue().Status.Used.Resources[resType].DeepCopy()
result.Sub(res)
q.Queue().Status.Used.Resources[resType] = result
}
preemptingPods[pod.Name] = preemptedPodInfo{
pod: pod,
totalReleasingResources: releasingResources,
detailReleasingResources: make(map[string]Resources),
}
glog.V(4).Infof("PreemptResource() pod %s releasing (%#v)", pod.Name, releasingResources)
}
q.Queue().Status.Allocated.Resources = q.Queue().Status.Deserved.Resources
}
glog.V(4).Infof("PreemptResource queuesOverused calculate, queue %s, deverved (%#v), allocated (%#v), used (%#v), preempting (%#v)",
q.Name(), q.Queue().Status.Deserved.Resources, q.Queue().Status.Allocated.Resources,
q.Queue().Status.Used.Resources, q.Queue().Status.Preempting.Resources)
}
p.dataMu.Lock()
for k, v := range preemptingPods {
if err := killPod(p.client, v.pod); err != nil {
// kill pod failed, it may be terminated before
// TODO call terminatePodDone later to update queue
}
p.terminatingPodsForPreempt[k] = v
}
p.dataMu.Unlock()
// do nothing for queuesPerfectused
// handler queuesUnderused which will preempt resources from other queue
resourceTypes := []string{"cpu", "memory"}
for _, q := range queuesUnderused {
if len(preemptingPods) == 0 {
// there is no preemptingPods left
// change Allcated to Deserved directly
q.Queue().Status.Allocated.Resources = q.Queue().Status.Deserved.Resources
} else {
// assign preempting pod resource to each queue
for _, v := range resourceTypes {
resType := apiv1.ResourceName(v)
deserved := q.Queue().Status.Deserved.Resources[resType].DeepCopy()
allocated := q.Queue().Status.Allocated.Resources[resType].DeepCopy()
increased := resource.MustParse("0")
if deserved.Cmp(allocated) > 0 {
deserved.Sub(allocated)
increased = deserved
}
for _, podInfo := range preemptingPods {
if increased.IsZero() {
break
}
releasing, ok := podInfo.totalReleasingResources[resType]
if !ok || releasing.IsZero() {
glog.V(4).Infof("preempting pod %s has no %s resource left", podInfo.pod.Name, resType)
continue
}
if increased.Cmp(releasing) >= 0 {
if podInfo.detailReleasingResources[q.Queue().Namespace] == nil {
podInfo.detailReleasingResources[q.Queue().Namespace] = make(map[apiv1.ResourceName]resource.Quantity)
}
podInfo.detailReleasingResources[q.Queue().Namespace][resType] = releasing
podInfo.totalReleasingResources[resType] = resource.MustParse("0")
increased.Sub(releasing)
} else {
podInfo.detailReleasingResources[q.Queue().Namespace][resType] = increased
releasing.Sub(increased)
podInfo.totalReleasingResources[resType] = releasing
}
}
if !increased.IsZero() {
allocated.Add(increased)
if q.Queue().Status.Allocated.Resources == nil {
q.Queue().Status.Allocated.Resources = make(map[apiv1.ResourceName]resource.Quantity)
}
q.Queue().Status.Allocated.Resources[resType] = allocated
}
}
// clean preemptingPods which is empty
for k, podInfo := range preemptingPods {
resourceCpu := podInfo.totalReleasingResources["cpu"].DeepCopy()
resourceMemory := podInfo.totalReleasingResources["memory"].DeepCopy()
if resourceCpu.IsZero() && resourceMemory.IsZero() {
delete(preemptingPods, k)
}
}
}
glog.V(4).Infof("PreemptResource queuesUnderused calculate, queue %s, deverved (%#v), allocated (%#v), used (%#v), preempting (%#v)",
q.Name(), q.Queue().Status.Deserved.Resources, q.Queue().Status.Allocated.Resources,
q.Queue().Status.Used.Resources, q.Queue().Status.Preempting.Resources)
}
if len(preemptingPods) != 0 {
glog.Error("preemptingPod is not empty, preemption may be ERROR")
}
// update Queue to API server under p.updateMu
p.updateMu.Lock()
queueClient, _, err := client.NewClient(p.config)
if err != nil {
return err
}
queueList := apiv1.QueueList{}
err = queueClient.Get().Resource(apiv1.QueuePlural).Do().Into(&queueList)
if err != nil {
return err
}
for _, oldQueue := range queueList.Items {
if len(queuesOverused) == 0 && len(queuesUnderused) == 0 {
break
}
// TODO update allocated and preempting for queues01 and queues03
q, ok := queuesOverused[oldQueue.Name]
if !ok {
q, ok = queuesUnderused[oldQueue.Name]
if !ok {
glog.V(4).Infof("queue %s not exist in queues01(D<A)/queues03(D>A)", oldQueue.Name)
continue
}
}
result := apiv1.Queue{}
err = queueClient.Put().
Resource(apiv1.QueuePlural).
Namespace(q.Queue().Namespace).
Name(q.Queue().Name).
Body(q.Queue()).
Do().Into(&result)
if err != nil {
glog.Errorf("fail to update queue info, name %s, %#v", q.Queue().Name, err)
}
}
p.updateMu.Unlock()
// wait until terminatingPods is empty
p.dataMu.Lock()
for len(p.terminatingPodsForPreempt) != 0 {
p.dataCond.Wait()
}
p.dataMu.Unlock()
return nil
}
func (p *basePreemption) terminatePodDone(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
glog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
p.dataMu.Lock()
// if the pod is terminated for underused, remove it from terminatingPodsForUnderused directly
if _, ok := p.terminatingPodsForUnderused[pod.Name]; ok {
delete(p.terminatingPodsForUnderused, pod.Name)
}
// if the pod is terminated for preemption, remove it from terminatingPods and update Queue
ppInfo, ok := p.terminatingPodsForPreempt[pod.Name]
if ok {
delete(p.terminatingPodsForPreempt, pod.Name)
}
p.dataMu.Unlock()
p.updateMu.Lock()
// update Queue preempting resources, this operation must be under p.updateMu
resourceTypes := []string{"cpu", "memory"}
if ok {
//fmt.Printf("===== terminatePodDone, preempting pod info %#v\n", ppInfo)
queueClient, _, err := client.NewClient(p.config)
if err != nil {
return
}
queueList := apiv1.QueueList{}
err = queueClient.Get().Resource(apiv1.QueuePlural).Do().Into(&queueList)
if err != nil {
return
}
for _, oldQueue := range queueList.Items {
releasingResource, ok := ppInfo.detailReleasingResources[oldQueue.Namespace]
if !ok {
continue
}
for _, v := range resourceTypes {
resType := apiv1.ResourceName(v)
releasing := releasingResource[resType].DeepCopy()
preempting := oldQueue.Status.Preempting.Resources[resType].DeepCopy()
if releasing.Cmp(preempting) < 0 {
preempting.Sub(releasing)
oldQueue.Status.Preempting.Resources[resType] = preempting
result := oldQueue.Status.Allocated.Resources[resType].DeepCopy()
result.Add(releasing)
oldQueue.Status.Allocated.Resources[resType] = result
} else {
oldQueue.Status.Preempting.Resources[resType] = resource.MustParse("0")
result := oldQueue.Status.Allocated.Resources[resType].DeepCopy()
result.Add(preempting)
oldQueue.Status.Allocated.Resources[resType] = result
}
}
// update Queue
result := apiv1.Queue{}
err = queueClient.Put().
Resource(apiv1.QueuePlural).
Namespace(oldQueue.Namespace).
Name(oldQueue.Name).
Body(oldQueue.DeepCopy()).
Do().Into(&result)
if err != nil {
glog.Errorf("fail to update queue info, name %s, %#v", oldQueue.Name, err)
}
}
}
p.updateMu.Unlock()
p.dataMu.Lock()
if len(p.terminatingPodsForPreempt) == 0 {
p.dataCond.Signal()
}
p.dataMu.Unlock()
}

View File

@ -97,6 +97,10 @@ func (ps *proportionScheduler) Allocate(
"memory": *resource.NewQuantity(int64(job.Queue().Spec.Weight)*totalMEM/totalWeight, resource.BinarySI),
},
}
// clear Used resources
allocatedResult[job.Name()].Queue().Status.Used = apiv1.ResourceList{
Resources: make(map[apiv1.ResourceName]resource.Quantity),
}
}
}
return allocatedResult

View File

@ -27,6 +27,7 @@ import (
"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
clientv1 "k8s.io/client-go/informers/core/v1"
@ -378,6 +379,33 @@ func (sc *schedulerCache) addQueue(queue *apiv1.Queue) error {
info := &QueueInfo{
name: queue.Name,
queue: queue.DeepCopy(),
Pods: make(map[string]*v1.Pod),
}
// init Deserved/Allocated/Used/Preemping if it is nil
if info.Queue().Status.Deserved.Resources == nil {
info.Queue().Status.Deserved.Resources = map[apiv1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0"),
}
}
if info.Queue().Status.Allocated.Resources == nil {
info.Queue().Status.Allocated.Resources = map[apiv1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0"),
}
}
if info.Queue().Status.Used.Resources == nil {
info.Queue().Status.Used.Resources = map[apiv1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0"),
}
}
if info.Queue().Status.Preempting.Resources == nil {
info.Queue().Status.Preempting.Resources = map[apiv1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0"),
}
}
sc.queues[queue.Name] = info
return nil

View File

@ -18,11 +18,30 @@ package schedulercache
import (
apiv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
type QueueInfo struct {
name string
queue *apiv1.Queue
Pods map[string]*v1.Pod
}
// true - all resources(cpu/memory) in res1 < res2
// false - not above case
func compareResources(res1 map[apiv1.ResourceName]resource.Quantity, res2 map[apiv1.ResourceName]resource.Quantity) bool {
cpu1 := res1["cpu"].DeepCopy()
cpu2 := res2["cpu"].DeepCopy()
memory1 := res1["memory"].DeepCopy()
memory2 := res2["memory"].DeepCopy()
if cpu1.Cmp(cpu2) <= 0 && memory1.Cmp(memory2) <= 0 {
return true
}
return false
}
func (r *QueueInfo) Name() string {
@ -33,10 +52,19 @@ func (r *QueueInfo) Queue() *apiv1.Queue {
return r.queue
}
func (r *QueueInfo) UsedUnderAllocated() bool {
return compareResources(r.queue.Status.Used.Resources, r.queue.Status.Allocated.Resources)
}
func (r *QueueInfo) UsedUnderDeserved() bool {
return compareResources(r.queue.Status.Used.Resources, r.queue.Status.Deserved.Resources)
}
func (r *QueueInfo) Clone() *QueueInfo {
clone := &QueueInfo{
name: r.name,
queue: r.queue.DeepCopy(),
Pods: r.Pods,
}
return clone
}

View File

@ -25,6 +25,7 @@ import (
"github.com/kubernetes-incubator/kube-arbitrator/pkg/client"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/controller"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy/preemption"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/policy/proportion"
"github.com/kubernetes-incubator/kube-arbitrator/pkg/schedulercache"
"github.com/kubernetes-incubator/kube-arbitrator/test/integration/framework"
@ -223,7 +224,7 @@ func TestArbitrator(t *testing.T) {
defer close(neverStop)
cache := schedulercache.New(config)
go cache.Run(neverStop)
c := controller.NewQueueController(config, cache, policy.New(proportion.PolicyName))
c := controller.NewQueueController(config, cache, policy.New(proportion.PolicyName), preemption.New(config))
go c.Run()
// sleep to wait scheduler finish