Add PodDisruptionBudget into cache

This commit is contained in:
Zhe Jin 2018-01-22 14:24:25 +08:00
parent d55917d85c
commit fb2d3c3118
4 changed files with 196 additions and 2 deletions

104
pkg/cache/cache.go vendored
View File

@ -23,8 +23,10 @@ import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/client-go/informers"
clientv1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -46,10 +48,12 @@ type SchedulerCache struct {
podInformer clientv1.PodInformer
nodeInformer clientv1.NodeInformer
consumerInformer arbclient.ConsumerInformer
pdbInformer policyv1.PodDisruptionBudgetInformer
Pods map[string]*PodInfo
Nodes map[string]*NodeInfo
Consumers map[string]*ConsumerInfo
Pdbs map[string]*PdbInfo
}
func newSchedulerCache(config *rest.Config) *SchedulerCache {
@ -57,6 +61,7 @@ func newSchedulerCache(config *rest.Config) *SchedulerCache {
Nodes: make(map[string]*NodeInfo),
Pods: make(map[string]*PodInfo),
Consumers: make(map[string]*ConsumerInfo),
Pdbs: make(map[string]*PdbInfo),
}
kubecli := kubernetes.NewForConfigOrDie(config)
@ -119,6 +124,25 @@ func newSchedulerCache(config *rest.Config) *SchedulerCache {
},
})
// create informer for pdb information
sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
sc.pdbInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1beta1.PodDisruptionBudget:
glog.V(4).Infof("Filter pdb name(%s)\n", t.Name)
return true
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPDB,
DeleteFunc: sc.DeletePDB,
},
})
return sc
}
@ -126,6 +150,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.podInformer.Informer().Run(stopCh)
go sc.nodeInformer.Informer().Run(stopCh)
go sc.consumerInformer.Informer().Run(stopCh)
go sc.pdbInformer.Informer().Run(stopCh)
}
func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
@ -168,6 +193,10 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
}
sc.Consumers[pod.Namespace].AddPod(pi)
for _, pdb := range sc.Pdbs {
sc.Consumers[pod.Namespace].AddPdb(pdb)
}
sc.Pods[key] = pi
return nil
@ -484,6 +513,77 @@ func (sc *SchedulerCache) DeleteConsumer(obj interface{}) {
return
}
func (sc *SchedulerCache) addPDB(pdb *v1beta1.PodDisruptionBudget) error {
pi := NewPdbInfo(pdb)
sc.Pdbs[pi.Name] = pi
for _, c := range sc.Consumers {
c.AddPdb(pi)
}
return nil
}
func (sc *SchedulerCache) deletePDB(pdb *v1beta1.PodDisruptionBudget) error {
pi, exist := sc.Pdbs[pdb.Name]
if !exist {
return nil
}
delete(sc.Pdbs, pdb.Name)
for _, c := range sc.Consumers {
c.RemovePdb(pi)
}
return nil
}
func (sc *SchedulerCache) AddPDB(obj interface{}) {
pdb, ok := obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("Cannot convert to *v1beta1.PodDisruptionBudget: %v", obj)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
glog.V(4).Infof("Add PodDisruptionBudget(%s) into cache, spec(%#v)", pdb.Name, pdb.Spec)
err := sc.addPDB(pdb)
if err != nil {
glog.Errorf("Failed to add PodDisruptionBudget %s into cache: %v", pdb.Name, err)
return
}
return
}
func (sc *SchedulerCache) DeletePDB(obj interface{}) {
var pdb *v1beta1.PodDisruptionBudget
switch t := obj.(type) {
case *v1beta1.PodDisruptionBudget:
pdb = t
case cache.DeletedFinalStateUnknown:
var ok bool
pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("Cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj)
return
}
default:
glog.Errorf("Cannot convert to *v1beta1.PodDisruptionBudget: %v", t)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.deletePDB(pdb)
if err != nil {
glog.Errorf("Failed to delete PodDisruptionBudget %s from cache: %v", pdb.Name, err)
return
}
return
}
func (sc *SchedulerCache) PodInformer() clientv1.PodInformer {
return sc.podInformer
}
@ -496,6 +596,10 @@ func (sc *SchedulerCache) QueueInformer() arbclient.ConsumerInformer {
return sc.consumerInformer
}
func (sc *SchedulerCache) PdbInformer() policyv1.PodDisruptionBudgetInformer {
return sc.pdbInformer
}
func (sc *SchedulerCache) Snapshot() *CacheSnapshot {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

View File

@ -17,9 +17,13 @@ limitations under the License.
package cache
import (
arbv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/golang/glog"
arbv1 "github.com/kubernetes-incubator/kube-arbitrator/pkg/apis/v1"
)
type ConsumerInfo struct {
@ -93,6 +97,46 @@ func (ci *ConsumerInfo) RemovePod(pi *PodInfo) {
}
}
func (ci *ConsumerInfo) AddPdb(pi *PdbInfo) {
for _, ps := range ci.PodSets {
if len(ps.PdbName) != 0 {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pi.Pdb.Spec.Selector)
if err != nil {
glog.V(4).Infof("LabelSelectorAsSelector fail for pdb %s", pi.Name)
continue
}
// One PDB is fully for one PodSet
// TODO(jinzhej): handle PDB cross different PodSet later on demand
if selector.Matches(labels.Set(ps.Labels)) {
ps.PdbName = pi.Name
if pi.Pdb.Spec.MinAvailable.Type == intstr.Int {
// support integer MinAvailable in PodDisruptionBuget
// TODO(jinzhej): percentage MinAvailable, integer/percentage MaxUnavailable will be supported on demand
ps.MinAvailable = int(pi.Pdb.Spec.MinAvailable.IntVal)
}
}
}
}
func (ci *ConsumerInfo) RemovePdb(pi *PdbInfo) {
for _, ps := range ci.PodSets {
if len(ps.PdbName) == 0 {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pi.Pdb.Spec.Selector)
if err != nil {
glog.V(4).Infof("LabelSelectorAsSelector fail for pdb %s", pi.Name)
continue
}
if selector.Matches(labels.Set(ps.Labels)) {
ps.PdbName = ""
ps.MinAvailable = 0
}
}
}
func (ci *ConsumerInfo) Clone() *ConsumerInfo {
info := &ConsumerInfo{
Name: ci.Name,

33
pkg/cache/pdb_info.go vendored Normal file
View File

@ -0,0 +1,33 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"k8s.io/api/policy/v1beta1"
)
type PdbInfo struct {
Name string
Pdb *v1beta1.PodDisruptionBudget
}
func NewPdbInfo(pdb *v1beta1.PodDisruptionBudget) *PdbInfo {
return &PdbInfo{
Name: pdb.Name,
Pdb: pdb,
}
}

13
pkg/cache/pod_set.go vendored
View File

@ -74,6 +74,9 @@ func (pi *PodInfo) Clone() *PodInfo {
type PodSet struct {
metav1.ObjectMeta
PdbName string
MinAvailable int
Allocated *Resource
TotalRequest *Resource
@ -88,6 +91,8 @@ func NewPodSet(uid types.UID) *PodSet {
Name: string(uid),
UID: uid,
},
PdbName: "",
MinAvailable: 0,
Allocated: EmptyResource(),
TotalRequest: EmptyResource(),
Running: make([]*PodInfo, 0),
@ -108,6 +113,12 @@ func (ps *PodSet) AddPodInfo(pi *PodInfo) {
default:
ps.Others = append(ps.Others, pi)
}
// Update PodSet Labels
// assume all pods in the same PodSet have same labels
if len(ps.Labels) == 0 && len(pi.Pod.Labels) != 0 {
ps.Labels = pi.Pod.Labels
}
}
func (ps *PodSet) DeletePodInfo(pi *PodInfo) {
@ -135,6 +146,8 @@ func (ps *PodSet) Clone() *PodSet {
Name: ps.Name,
UID: ps.UID,
},
PdbName: ps.PdbName,
MinAvailable: ps.MinAvailable,
Allocated: ps.Allocated.Clone(),
TotalRequest: ps.TotalRequest.Clone(),
Running: make([]*PodInfo, 0),