change resourcequotaallocator to queue

This commit is contained in:
Zhe Jin 2017-09-29 10:23:59 +08:00
parent 2a5b8f94aa
commit dc09cb127f
14 changed files with 259 additions and 273 deletions

View File

@ -44,7 +44,7 @@ func Run(opt *options.ServerOption) error {
go cache.Run(neverStop)
// TODO dump cache information and do something
c := controller.NewResourceQuotaAllocatorController(config, cache, proportion.New())
c := controller.NewQueueController(config, cache, proportion.New())
c.Run()
return nil

View File

@ -28,7 +28,7 @@ var (
)
// GroupName is the group name used in this package.
const GroupName = "cr.client-go.k8s.io"
const GroupName = "arbitrator.incubator.k8s.io"
// SchemeGroupVersion is the group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
@ -41,8 +41,8 @@ func Resource(resource string) schema.GroupResource {
// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&ResourceQuotaAllocator{},
&ResourceQuotaAllocatorList{},
&Queue{},
&QueueList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -31,17 +31,17 @@ import (
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
)
var _ runtime.Object = &ResourceQuotaAllocator{}
var _ metav1.ObjectMetaAccessor = &ResourceQuotaAllocator{}
var _ runtime.Object = &Queue{}
var _ metav1.ObjectMetaAccessor = &Queue{}
var _ runtime.Object = &ResourceQuotaAllocatorList{}
var _ metav1.ListMetaAccessor = &ResourceQuotaAllocatorList{}
var _ runtime.Object = &QueueList{}
var _ metav1.ListMetaAccessor = &QueueList{}
func exampleFuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
return []interface{}{
func(obj *ResourceQuotaAllocatorList, c fuzz.Continue) {
func(obj *QueueList, c fuzz.Continue) {
c.FuzzNoCustom(obj)
obj.Items = make([]ResourceQuotaAllocator, c.Intn(10))
obj.Items = make([]Queue, c.Intn(10))
for i := range obj.Items {
c.Fuzz(&obj.Items[i])
}

View File

@ -19,33 +19,34 @@ package v1
import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
const ResourceQuotaAllocatorPlural = "resourcequotaallocators"
const QueuePlural = "queues"
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ResourceQuotaAllocator struct {
type Queue struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Spec ResourceQuotaAllocatorSpec `json:"spec"`
Status ResourceQuotaAllocatorStatus `json:"status,omitempty"`
Spec QueueSpec `json:"spec"`
Status QueueStatus `json:"status,omitempty"`
}
type ResourceQuotaAllocatorSpec struct {
Share map[string]intstr.IntOrString `json:"share"`
type QueueSpec struct {
Weight int `json:"weight"`
}
type ResourceQuotaAllocatorStatus struct {
Share ResourceList `json:"share"`
Usage ResourceList `json:"usage"`
type QueueStatus struct {
Deserved ResourceList `json:"deserved"`
Allocated ResourceList `json:"allocated"`
Used ResourceList `json:"used"`
Preempting ResourceList `json:"preempting"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type ResourceQuotaAllocatorList struct {
type QueueList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ResourceQuotaAllocator `json:"items"`
Items []Queue `json:"items"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -24,7 +24,6 @@ import (
resource "k8s.io/apimachinery/pkg/api/resource"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
intstr "k8s.io/apimachinery/pkg/util/intstr"
reflect "reflect"
)
@ -33,29 +32,128 @@ import (
// Deprecated: deepcopy registration will go away when static deepcopy is fully implemented.
func GetGeneratedDeepCopyFuncs() []conversion.GeneratedDeepCopyFunc {
return []conversion.GeneratedDeepCopyFunc{
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*Queue).DeepCopyInto(out.(*Queue))
return nil
}, InType: reflect.TypeOf(&Queue{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*QueueList).DeepCopyInto(out.(*QueueList))
return nil
}, InType: reflect.TypeOf(&QueueList{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*QueueSpec).DeepCopyInto(out.(*QueueSpec))
return nil
}, InType: reflect.TypeOf(&QueueSpec{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*QueueStatus).DeepCopyInto(out.(*QueueStatus))
return nil
}, InType: reflect.TypeOf(&QueueStatus{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*ResourceList).DeepCopyInto(out.(*ResourceList))
return nil
}, InType: reflect.TypeOf(&ResourceList{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*ResourceQuotaAllocator).DeepCopyInto(out.(*ResourceQuotaAllocator))
return nil
}, InType: reflect.TypeOf(&ResourceQuotaAllocator{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*ResourceQuotaAllocatorList).DeepCopyInto(out.(*ResourceQuotaAllocatorList))
return nil
}, InType: reflect.TypeOf(&ResourceQuotaAllocatorList{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*ResourceQuotaAllocatorSpec).DeepCopyInto(out.(*ResourceQuotaAllocatorSpec))
return nil
}, InType: reflect.TypeOf(&ResourceQuotaAllocatorSpec{})},
{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error {
in.(*ResourceQuotaAllocatorStatus).DeepCopyInto(out.(*ResourceQuotaAllocatorStatus))
return nil
}, InType: reflect.TypeOf(&ResourceQuotaAllocatorStatus{})},
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Queue) DeepCopyInto(out *Queue) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
in.Status.DeepCopyInto(&out.Status)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Queue.
func (in *Queue) DeepCopy() *Queue {
if in == nil {
return nil
}
out := new(Queue)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Queue) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
} else {
return nil
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *QueueList) DeepCopyInto(out *QueueList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Queue, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueList.
func (in *QueueList) DeepCopy() *QueueList {
if in == nil {
return nil
}
out := new(QueueList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *QueueList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
} else {
return nil
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *QueueSpec) DeepCopyInto(out *QueueSpec) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueSpec.
func (in *QueueSpec) DeepCopy() *QueueSpec {
if in == nil {
return nil
}
out := new(QueueSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *QueueStatus) DeepCopyInto(out *QueueStatus) {
*out = *in
in.Deserved.DeepCopyInto(&out.Deserved)
in.Allocated.DeepCopyInto(&out.Allocated)
in.Used.DeepCopyInto(&out.Used)
in.Preempting.DeepCopyInto(&out.Preempting)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueStatus.
func (in *QueueStatus) DeepCopy() *QueueStatus {
if in == nil {
return nil
}
out := new(QueueStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceList) DeepCopyInto(out *ResourceList) {
*out = *in
@ -89,107 +187,3 @@ func (in *ResourceList) DeepCopyObject() runtime.Object {
return nil
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceQuotaAllocator) DeepCopyInto(out *ResourceQuotaAllocator) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceQuotaAllocator.
func (in *ResourceQuotaAllocator) DeepCopy() *ResourceQuotaAllocator {
if in == nil {
return nil
}
out := new(ResourceQuotaAllocator)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ResourceQuotaAllocator) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
} else {
return nil
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceQuotaAllocatorList) DeepCopyInto(out *ResourceQuotaAllocatorList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ResourceQuotaAllocator, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceQuotaAllocatorList.
func (in *ResourceQuotaAllocatorList) DeepCopy() *ResourceQuotaAllocatorList {
if in == nil {
return nil
}
out := new(ResourceQuotaAllocatorList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ResourceQuotaAllocatorList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
} else {
return nil
}
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceQuotaAllocatorSpec) DeepCopyInto(out *ResourceQuotaAllocatorSpec) {
*out = *in
if in.Share != nil {
in, out := &in.Share, &out.Share
*out = make(map[string]intstr.IntOrString, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceQuotaAllocatorSpec.
func (in *ResourceQuotaAllocatorSpec) DeepCopy() *ResourceQuotaAllocatorSpec {
if in == nil {
return nil
}
out := new(ResourceQuotaAllocatorSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ResourceQuotaAllocatorStatus) DeepCopyInto(out *ResourceQuotaAllocatorStatus) {
*out = *in
in.Share.DeepCopyInto(&out.Share)
in.Usage.DeepCopyInto(&out.Usage)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceQuotaAllocatorStatus.
func (in *ResourceQuotaAllocatorStatus) DeepCopy() *ResourceQuotaAllocatorStatus {
if in == nil {
return nil
}
out := new(ResourceQuotaAllocatorStatus)
in.DeepCopyInto(out)
return out
}

View File

@ -30,20 +30,20 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)
const resourceQuotaAllocatorCRDName = crv1.ResourceQuotaAllocatorPlural + "." + crv1.GroupName
const queueCRDName = crv1.QueuePlural + "." + crv1.GroupName
func CreateResourceQuotaAllocatorCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1beta1.CustomResourceDefinition, error) {
func CreateQueueCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1beta1.CustomResourceDefinition, error) {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: resourceQuotaAllocatorCRDName,
Name: queueCRDName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crv1.GroupName,
Version: crv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crv1.ResourceQuotaAllocatorPlural,
Kind: reflect.TypeOf(crv1.ResourceQuotaAllocator{}).Name(),
Plural: crv1.QueuePlural,
Kind: reflect.TypeOf(crv1.Queue{}).Name(),
},
},
}
@ -54,7 +54,7 @@ func CreateResourceQuotaAllocatorCRD(clientset apiextensionsclient.Interface) (*
// wait for CRD being established
err = wait.Poll(500*time.Millisecond, 60*time.Second, func() (bool, error) {
crd, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(resourceQuotaAllocatorCRDName, metav1.GetOptions{})
crd, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(queueCRDName, metav1.GetOptions{})
if err != nil {
return false, err
}
@ -73,7 +73,7 @@ func CreateResourceQuotaAllocatorCRD(clientset apiextensionsclient.Interface) (*
return false, err
})
if err != nil {
deleteErr := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(resourceQuotaAllocatorCRDName, nil)
deleteErr := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(queueCRDName, nil)
if deleteErr != nil {
return nil, errors.NewAggregate([]error{err, deleteErr})
}

View File

@ -27,14 +27,14 @@ import (
clientcache "k8s.io/client-go/tools/cache"
)
type ResourceQuotaAllocatorController struct {
type QueueController struct {
cache schedulercache.Cache
allocator policy.Interface
quotaManager *quotaManager
}
func NewResourceQuotaAllocatorController(config *rest.Config, cache schedulercache.Cache, allocator policy.Interface) *ResourceQuotaAllocatorController {
rqaController := &ResourceQuotaAllocatorController{
func NewQueueController(config *rest.Config, cache schedulercache.Cache, allocator policy.Interface) *QueueController {
rqaController := &QueueController{
cache: cache,
allocator: allocator,
quotaManager: &quotaManager{
@ -46,14 +46,14 @@ func NewResourceQuotaAllocatorController(config *rest.Config, cache schedulercac
return rqaController
}
func (r *ResourceQuotaAllocatorController) Run() {
func (r *QueueController) Run() {
go r.quotaManager.run()
wait.Until(r.runOnce, 2*time.Second, wait.NeverStop)
}
func (r *ResourceQuotaAllocatorController) runOnce() {
func (r *QueueController) runOnce() {
snapshot := r.cache.Dump()
jobGroups := r.allocator.Group(snapshot.Allocators)
jobGroups := r.allocator.Group(snapshot.Queues)
allocations := r.allocator.Allocate(jobGroups, snapshot.Nodes)
for _, alloc := range allocations {
r.quotaManager.updateQuota(alloc)

View File

@ -49,15 +49,15 @@ func quotaKeyFunc(obj interface{}) (string, error) {
}
// updateQuota add update request based on ResourceQuotaAllocator to queue
func (qm *quotaManager) updateQuota(allocator *schedulercache.ResourceQuotaAllocatorInfo) {
if allocator == nil {
func (qm *quotaManager) updateQuota(queue *schedulercache.QueueInfo) {
if queue == nil {
return
}
res := updatedResource{
ns: allocator.Allocator().Namespace,
ns: queue.Queue().Namespace,
}
for k, v := range allocator.Allocator().Status.Share.Resources {
for k, v := range queue.Queue().Status.Deserved.Resources {
switch k {
case "cpu":
if cpu, ok := v.AsInt64(); ok {

View File

@ -29,16 +29,16 @@ type Interface interface {
Initialize()
// Group grouping the job into different bucket, and allocate those resources based on those groups.
Group(jobs []*schedulercache.ResourceQuotaAllocatorInfo) map[string][]*schedulercache.ResourceQuotaAllocatorInfo
Group(jobs []*schedulercache.QueueInfo) map[string][]*schedulercache.QueueInfo
// Allocate allocates the cluster's resources into each group.
Allocate(jobGroup map[string][]*schedulercache.ResourceQuotaAllocatorInfo, nodes []*schedulercache.NodeInfo) map[string]*schedulercache.ResourceQuotaAllocatorInfo
Allocate(jobGroup map[string][]*schedulercache.QueueInfo, nodes []*schedulercache.NodeInfo) map[string]*schedulercache.QueueInfo
// Assign allocates resources of group into each jobs.
Assign(jobs []*schedulercache.ResourceQuotaAllocatorInfo, alloc *schedulercache.ResourceQuotaAllocatorInfo) *schedulercache.Resource
Assign(jobs []*schedulercache.QueueInfo, alloc *schedulercache.QueueInfo) *schedulercache.Resource
// Polish returns the Pods that should be evict to release resources.
Polish(job *schedulercache.ResourceQuotaAllocatorInfo, res *schedulercache.Resource) []*schedulercache.ResourceQuotaAllocatorInfo
Polish(job *schedulercache.QueueInfo, res *schedulercache.Resource) []*schedulercache.QueueInfo
// UnIntialize un-initializes the allocator plugins.
UnInitialize()

View File

@ -48,20 +48,20 @@ func (ps *proportionScheduler) Initialize() {
}
func (ps *proportionScheduler) Group(
jobs []*schedulercache.ResourceQuotaAllocatorInfo,
) map[string][]*schedulercache.ResourceQuotaAllocatorInfo {
groups := make(map[string][]*schedulercache.ResourceQuotaAllocatorInfo)
jobs []*schedulercache.QueueInfo,
) map[string][]*schedulercache.QueueInfo {
groups := make(map[string][]*schedulercache.QueueInfo)
for _, job := range jobs {
groups[job.Allocator().Namespace] = append(groups[job.Allocator().Namespace], job)
groups[job.Queue().Namespace] = append(groups[job.Queue().Namespace], job)
}
return groups
}
func (ps *proportionScheduler) Allocate(
jobGroup map[string][]*schedulercache.ResourceQuotaAllocatorInfo,
jobGroup map[string][]*schedulercache.QueueInfo,
nodes []*schedulercache.NodeInfo,
) map[string]*schedulercache.ResourceQuotaAllocatorInfo {
) map[string]*schedulercache.QueueInfo {
totalCPU := int64(0)
totalMEM := int64(0)
for _, node := range nodes {
@ -79,9 +79,7 @@ func (ps *proportionScheduler) Allocate(
totalWeight := int64(0)
for _, jobs := range jobGroup {
for _, job := range jobs {
if weight, ok := job.Allocator().Spec.Share["weight"]; ok {
totalWeight += int64(weight.IntValue())
}
totalWeight += int64(job.Queue().Spec.Weight)
}
}
glog.V(4).Infof("proportion scheduler, total cpu %d, total memory %d, total weight %d", totalCPU, totalMEM, totalWeight)
@ -91,17 +89,15 @@ func (ps *proportionScheduler) Allocate(
return nil
}
allocatedResult := make(map[string]*schedulercache.ResourceQuotaAllocatorInfo)
allocatedResult := make(map[string]*schedulercache.QueueInfo)
for _, jobs := range jobGroup {
for _, job := range jobs {
if weight, ok := job.Allocator().Spec.Share["weight"]; ok {
allocatedResult[job.Name()] = job.Clone()
allocatedResult[job.Name()].Allocator().Status.Share = apiv1.ResourceList{
Resources: map[apiv1.ResourceName]resource.Quantity{
"cpu": *resource.NewQuantity(int64(weight.IntValue())*totalCPU/totalWeight, resource.DecimalSI),
"memory": *resource.NewQuantity(int64(weight.IntValue())*totalMEM/totalWeight, resource.BinarySI),
},
}
allocatedResult[job.Name()] = job.Clone()
allocatedResult[job.Name()].Queue().Status.Deserved = apiv1.ResourceList{
Resources: map[apiv1.ResourceName]resource.Quantity{
"cpu": *resource.NewQuantity(int64(job.Queue().Spec.Weight)*totalCPU/totalWeight, resource.DecimalSI),
"memory": *resource.NewQuantity(int64(job.Queue().Spec.Weight)*totalMEM/totalWeight, resource.BinarySI),
},
}
}
}
@ -109,17 +105,17 @@ func (ps *proportionScheduler) Allocate(
}
func (ps *proportionScheduler) Assign(
jobs []*schedulercache.ResourceQuotaAllocatorInfo,
alloc *schedulercache.ResourceQuotaAllocatorInfo,
jobs []*schedulercache.QueueInfo,
alloc *schedulercache.QueueInfo,
) *schedulercache.Resource {
// TODO
return nil
}
func (ps *proportionScheduler) Polish(
job *schedulercache.ResourceQuotaAllocatorInfo,
job *schedulercache.QueueInfo,
res *schedulercache.Resource,
) []*schedulercache.ResourceQuotaAllocatorInfo {
) []*schedulercache.QueueInfo {
// TODO
return nil
}

View File

@ -49,16 +49,16 @@ type schedulerCache struct {
nodeInformer clientv1.NodeInformer
rqaController cache.Controller
pods map[string]*PodInfo
nodes map[string]*NodeInfo
resourceQuotaAllocators map[string]*ResourceQuotaAllocatorInfo
pods map[string]*PodInfo
nodes map[string]*NodeInfo
queues map[string]*QueueInfo
}
func newSchedulerCache(config *rest.Config) *schedulerCache {
sc := &schedulerCache{
nodes: make(map[string]*NodeInfo),
pods: make(map[string]*PodInfo),
resourceQuotaAllocators: make(map[string]*ResourceQuotaAllocatorInfo),
nodes: make(map[string]*NodeInfo),
pods: make(map[string]*PodInfo),
queues: make(map[string]*QueueInfo),
}
kubecli := kubernetes.NewForConfigOrDie(config)
@ -95,13 +95,13 @@ func newSchedulerCache(config *rest.Config) *schedulerCache {
},
})
// create resourcequotaallocator resource first
err := createResourceQuotaAllocatorCRD(config)
// create queue resource first
err := createQueueCRD(config)
if err != nil {
panic(err)
}
// create informer/controller
sc.rqaController, err = createResourceQuotaAllocatorCRDController(config, sc)
sc.rqaController, err = createQueueCRDController(config, sc)
if err != nil {
panic(err)
}
@ -109,37 +109,37 @@ func newSchedulerCache(config *rest.Config) *schedulerCache {
return sc
}
func createResourceQuotaAllocatorCRD(config *rest.Config) error {
func createQueueCRD(config *rest.Config) error {
extensionscs, err := apiextensionsclient.NewForConfig(config)
if err != nil {
return err
}
_, err = client.CreateResourceQuotaAllocatorCRD(extensionscs)
_, err = client.CreateQueueCRD(extensionscs)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}
func createResourceQuotaAllocatorCRDController(config *rest.Config, sc *schedulerCache) (cache.Controller, error) {
func createQueueCRDController(config *rest.Config, sc *schedulerCache) (cache.Controller, error) {
rqaClient, _, err := client.NewClient(config)
if err != nil {
return nil, err
}
source := cache.NewListWatchFromClient(
rqaClient,
apiv1.ResourceQuotaAllocatorPlural,
apiv1.QueuePlural,
v1.NamespaceAll,
fields.Everything())
_, controller := cache.NewInformer(
source,
&apiv1.ResourceQuotaAllocator{},
&apiv1.Queue{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddResourceQuotaAllocator,
UpdateFunc: sc.UpdateResourceQuotaAllocator,
DeleteFunc: sc.DeleteResourceQuotaAllocator,
AddFunc: sc.AddQueue,
UpdateFunc: sc.UpdateQueue,
DeleteFunc: sc.DeleteQueue,
})
return controller, nil
@ -370,104 +370,104 @@ func (sc *schedulerCache) DeleteNode(obj interface{}) {
}
// Assumes that lock is already acquired.
func (sc *schedulerCache) addResourceQuotaAllocator(rqa *apiv1.ResourceQuotaAllocator) error {
if _, ok := sc.resourceQuotaAllocators[rqa.Name]; ok {
return fmt.Errorf("resourceQuotaAllocator %v exist", rqa.Name)
func (sc *schedulerCache) addQueue(queue *apiv1.Queue) error {
if _, ok := sc.queues[queue.Name]; ok {
return fmt.Errorf("queue %v exist", queue.Name)
}
info := &ResourceQuotaAllocatorInfo{
name: rqa.Name,
allocator: rqa.DeepCopy(),
info := &QueueInfo{
name: queue.Name,
queue: queue.DeepCopy(),
}
sc.resourceQuotaAllocators[rqa.Name] = info
sc.queues[queue.Name] = info
return nil
}
// Assumes that lock is already acquired.
func (sc *schedulerCache) updateResourceQuotaAllocator(oldRqa, newRqa *apiv1.ResourceQuotaAllocator) error {
if err := sc.deleteResourceQuotaAllocator(oldRqa); err != nil {
func (sc *schedulerCache) updateQueue(oldQueue, newQueue *apiv1.Queue) error {
if err := sc.deleteQueue(oldQueue); err != nil {
return err
}
sc.addResourceQuotaAllocator(newRqa)
sc.addQueue(newQueue)
return nil
}
// Assumes that lock is already acquired.
func (sc *schedulerCache) deleteResourceQuotaAllocator(rqa *apiv1.ResourceQuotaAllocator) error {
if _, ok := sc.resourceQuotaAllocators[rqa.Name]; !ok {
return fmt.Errorf("resourceQuotaAllocator %v doesn't exist", rqa.Name)
func (sc *schedulerCache) deleteQueue(queue *apiv1.Queue) error {
if _, ok := sc.queues[queue.Name]; !ok {
return fmt.Errorf("queue %v doesn't exist", queue.Name)
}
delete(sc.resourceQuotaAllocators, rqa.Name)
delete(sc.queues, queue.Name)
return nil
}
func (sc *schedulerCache) AddResourceQuotaAllocator(obj interface{}) {
rqa, ok := obj.(*apiv1.ResourceQuotaAllocator)
func (sc *schedulerCache) AddQueue(obj interface{}) {
queue, ok := obj.(*apiv1.Queue)
if !ok {
glog.Errorf("cannot convert to *apiv1.ResourceQuotaAllocator: %v", obj)
glog.Errorf("cannot convert to *apiv1.Queue: %v", obj)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("ADD Allocator(%s) into cache, status(%#v), spec(%#v)\n", rqa.Name, rqa.Status, rqa.Spec)
err := sc.addResourceQuotaAllocator(rqa)
glog.V(4).Infof("ADD queue(%s) into cache, status(%#v), spec(%#v)\n", queue.Name, queue.Status, queue.Spec)
err := sc.addQueue(queue)
if err != nil {
glog.Errorf("failed to add allocator %s into cache: %v", rqa.Name, err)
glog.Errorf("failed to add queue %s into cache: %v", queue.Name, err)
return
}
return
}
func (sc *schedulerCache) UpdateResourceQuotaAllocator(oldObj, newObj interface{}) {
oldRqa, ok := oldObj.(*apiv1.ResourceQuotaAllocator)
func (sc *schedulerCache) UpdateQueue(oldObj, newObj interface{}) {
oldQueue, ok := oldObj.(*apiv1.Queue)
if !ok {
glog.Errorf("cannot convert oldObj to *apiv1.ResourceQuotaAllocator: %v", oldObj)
glog.Errorf("cannot convert oldObj to *apiv1.Queue: %v", oldObj)
return
}
newRqa, ok := newObj.(*apiv1.ResourceQuotaAllocator)
newQueue, ok := newObj.(*apiv1.Queue)
if !ok {
glog.Errorf("cannot convert newObj to *apiv1.ResourceQuotaAllocator: %v", newObj)
glog.Errorf("cannot convert newObj to *apiv1.Queue: %v", newObj)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("UPDATE oldAllocator(%s) in cache, status(%#v), spec(%#v)\n", oldRqa.Name, oldRqa.Status, oldRqa.Spec)
glog.V(4).Infof("UPDATE newAllocator(%s) in cache, status(%#v), spec(%#v)\n", newRqa.Name, newRqa.Status, newRqa.Spec)
err := sc.updateResourceQuotaAllocator(oldRqa, newRqa)
glog.V(4).Infof("UPDATE oldQueue(%s) in cache, status(%#v), spec(%#v)\n", oldQueue.Name, oldQueue.Status, oldQueue.Spec)
glog.V(4).Infof("UPDATE newQueue(%s) in cache, status(%#v), spec(%#v)\n", newQueue.Name, newQueue.Status, newQueue.Spec)
err := sc.updateQueue(oldQueue, newQueue)
if err != nil {
glog.Errorf("failed to update allocator %s into cache: %v", oldRqa.Name, err)
glog.Errorf("failed to update queue %s into cache: %v", oldQueue.Name, err)
return
}
return
}
func (sc *schedulerCache) DeleteResourceQuotaAllocator(obj interface{}) {
var rqa *apiv1.ResourceQuotaAllocator
func (sc *schedulerCache) DeleteQueue(obj interface{}) {
var queue *apiv1.Queue
switch t := obj.(type) {
case *apiv1.ResourceQuotaAllocator:
rqa = t
case *apiv1.Queue:
queue = t
case cache.DeletedFinalStateUnknown:
var ok bool
rqa, ok = t.Obj.(*apiv1.ResourceQuotaAllocator)
queue, ok = t.Obj.(*apiv1.Queue)
if !ok {
glog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
glog.Errorf("cannot convert to *v1.Queue: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1.Node: %v", t)
glog.Errorf("cannot convert to *v1.Queue: %v", t)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.deleteResourceQuotaAllocator(rqa)
err := sc.deleteQueue(queue)
if err != nil {
glog.Errorf("failed to delete allocator %s from cache: %v", rqa.Name, err)
glog.Errorf("failed to delete queue %s from cache: %v", queue.Name, err)
return
}
return
@ -478,9 +478,9 @@ func (sc *schedulerCache) Dump() *CacheSnapshot {
defer sc.Mutex.Unlock()
snapshot := &CacheSnapshot{
Nodes: make([]*NodeInfo, 0, len(sc.nodes)),
Pods: make([]*PodInfo, 0, len(sc.pods)),
Allocators: make([]*ResourceQuotaAllocatorInfo, 0, len(sc.resourceQuotaAllocators)),
Nodes: make([]*NodeInfo, 0, len(sc.nodes)),
Pods: make([]*PodInfo, 0, len(sc.pods)),
Queues: make([]*QueueInfo, 0, len(sc.queues)),
}
for _, value := range sc.nodes {
@ -489,8 +489,8 @@ func (sc *schedulerCache) Dump() *CacheSnapshot {
for _, value := range sc.pods {
snapshot.Pods = append(snapshot.Pods, value.Clone())
}
for _, value := range sc.resourceQuotaAllocators {
snapshot.Allocators = append(snapshot.Allocators, value.Clone())
for _, value := range sc.queues {
snapshot.Queues = append(snapshot.Queues, value.Clone())
}
return snapshot
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package schedulercache
type CacheSnapshot struct {
Pods []*PodInfo
Nodes []*NodeInfo
Allocators []*ResourceQuotaAllocatorInfo
Pods []*PodInfo
Nodes []*NodeInfo
Queues []*QueueInfo
}

View File

@ -20,23 +20,23 @@ import (
apiv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
)
type ResourceQuotaAllocatorInfo struct {
name string
allocator *apiv1.ResourceQuotaAllocator
type QueueInfo struct {
name string
queue *apiv1.Queue
}
func (r *ResourceQuotaAllocatorInfo) Name() string {
func (r *QueueInfo) Name() string {
return r.name
}
func (r *ResourceQuotaAllocatorInfo) Allocator() *apiv1.ResourceQuotaAllocator {
return r.allocator
func (r *QueueInfo) Queue() *apiv1.Queue {
return r.queue
}
func (r *ResourceQuotaAllocatorInfo) Clone() *ResourceQuotaAllocatorInfo {
clone := &ResourceQuotaAllocatorInfo{
name: r.name,
allocator: r.allocator.DeepCopy(),
func (r *QueueInfo) Clone() *QueueInfo {
clone := &QueueInfo{
name: r.name,
queue: r.queue.DeepCopy(),
}
return clone
}

View File

@ -33,7 +33,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
@ -152,32 +151,28 @@ func prepareCRD(config *restclient.Config) error {
return fmt.Errorf("fail to create crd client, %#v", err)
}
crd01 := &apiv1.ResourceQuotaAllocator{
crd01 := &apiv1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "allocator01",
Namespace: "ns01",
},
Spec: apiv1.ResourceQuotaAllocatorSpec{
Share: map[string]intstr.IntOrString{
"weight": intstr.FromInt(1),
},
Spec: apiv1.QueueSpec{
Weight: 1,
},
}
crd02 := &apiv1.ResourceQuotaAllocator{
crd02 := &apiv1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "allocator02",
Namespace: "ns02",
},
Spec: apiv1.ResourceQuotaAllocatorSpec{
Share: map[string]intstr.IntOrString{
"weight": intstr.FromInt(2),
},
Spec: apiv1.QueueSpec{
Weight: 2,
},
}
var result apiv1.ResourceQuotaAllocator
var result apiv1.Queue
err = crdClient.Post().
Resource(apiv1.ResourceQuotaAllocatorPlural).
Resource(apiv1.QueuePlural).
Namespace(crd01.Namespace).
Body(crd01).
Do().Into(&result)
@ -185,7 +180,7 @@ func prepareCRD(config *restclient.Config) error {
return fmt.Errorf("fail to create crd crd01, %#v", err)
}
err = crdClient.Post().
Resource(apiv1.ResourceQuotaAllocatorPlural).
Resource(apiv1.QueuePlural).
Namespace(crd02.Namespace).
Body(crd02).
Do().Into(&result)