Cluster-autoscaler: include PodDisruptionBudget in drain - part 1/2

This commit is contained in:
Marcin Wielgus 2017-03-06 16:36:09 +01:00
parent 25592dac77
commit 5b4441083a
8 changed files with 207 additions and 44 deletions

View File

@ -66,7 +66,7 @@ func TestNewAutoscalerStatic(t *testing.T) {
},
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil)
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil)
a := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
assert.IsType(t, &StaticAutoscaler{}, a)
}
@ -103,7 +103,7 @@ func TestNewAutoscalerDynamic(t *testing.T) {
},
}
predicateChecker := simulator.NewTestPredicateChecker()
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil)
listerRegistry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil)
a := NewAutoscaler(opts, predicateChecker, fakeClient, kubeEventRecorder, listerRegistry)
assert.IsType(t, &DynamicAutoscaler{}, a)
}

View File

@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_record "k8s.io/client-go/tools/record"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -97,7 +98,8 @@ func (sd *ScaleDown) GetCandidatesForScaleDown() []*apiv1.Node {
func (sd *ScaleDown) UpdateUnneededNodes(
nodes []*apiv1.Node,
pods []*apiv1.Pod,
timestamp time.Time) error {
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget) error {
currentlyUnneededNodes := make([]*apiv1.Node, 0)
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
@ -128,7 +130,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, sd.context.PredicateChecker,
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp)
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if err != nil {
glog.Errorf("Error while simulating node drains: %v", err)
@ -161,7 +163,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod) (ScaleDownResult, error) {
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, error) {
now := time.Now()
candidates := make([]*apiv1.Node, 0)
@ -254,7 +256,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod) (Sca
// We look for only 1 node so new hints may be incomplete.
nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, sd.context.ClientSet,
sd.context.PredicateChecker, 1, false,
sd.podLocationHints, sd.usageTracker, time.Now())
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)
if err != nil {
return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err)

View File

@ -76,7 +76,7 @@ func TestFindUnneededNodes(t *testing.T) {
PredicateChecker: simulator.NewTestPredicateChecker(),
}
sd := NewScaleDown(&context)
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now())
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil)
assert.Equal(t, 1, len(sd.unneededNodes))
addTime, found := sd.unneededNodes["n2"]
@ -85,7 +85,7 @@ func TestFindUnneededNodes(t *testing.T) {
assert.Equal(t, 4, len(sd.nodeUtilizationMap))
sd.unneededNodes["n1"] = time.Now()
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now())
sd.UpdateUnneededNodes([]*apiv1.Node{n1, n2, n3, n4}, []*apiv1.Pod{p1, p2, p3, p4}, time.Now(), nil)
assert.Equal(t, 1, len(sd.unneededNodes))
addTime2, found := sd.unneededNodes["n2"]
@ -207,8 +207,8 @@ func TestScaleDown(t *testing.T) {
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}),
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute))
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2})
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleted, result)
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
@ -265,8 +265,8 @@ func TestNoScaleDownUnready(t *testing.T) {
// N1 is unready so it requires a bigger unneeded time.
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute))
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2})
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNoUnneeded, result)
@ -284,8 +284,8 @@ func TestNoScaleDownUnready(t *testing.T) {
// N1 has been unready for 2 hours, ok to delete.
context.CloudProvider = provider
scaleDown = NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour))
result, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2})
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil)
result, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNodeDeleted, result)
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
@ -364,8 +364,8 @@ func TestScaleDownNoMove(t *testing.T) {
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}),
}
scaleDown := NewScaleDown(context)
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute))
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2})
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil)
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil)
assert.NoError(t, err)
assert.Equal(t, ScaleDownNoUnneeded, result)
}

View File

@ -82,6 +82,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
allNodeLister := a.AllNodeLister()
unschedulablePodLister := a.UnschedulablePodLister()
scheduledPodLister := a.ScheduledPodLister()
pdbLister := a.PodDisruptionBudgetLister()
scaleDown := a.scaleDown
autoscalingContext := a.AutoscalingContext
@ -215,6 +216,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
if a.ScaleDownEnabled {
unneededStart := time.Now()
pdbs, err := pdbLister.List()
if err != nil {
glog.Errorf("Failed to list pod disruption budgets: %v", err)
return
}
// In dry run only utilization is updated
calculateUnneededOnly := a.lastScaleUpTime.Add(a.ScaleDownDelay).After(time.Now()) ||
a.lastScaleDownFailedTrial.Add(a.ScaleDownTrialInterval).After(time.Now()) ||
@ -228,7 +235,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
glog.V(4).Infof("Calculating unneeded nodes")
scaleDown.CleanUp(time.Now())
err := scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now())
err = scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs)
if err != nil {
glog.Warningf("Failed to scale down: %v", err)
return
@ -247,7 +254,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
scaleDownStart := time.Now()
metrics.UpdateLastTime("scaledown")
result, err := scaleDown.TryToScaleDown(allNodes, allScheduled)
result, err := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs)
metrics.UpdateDuration("scaledown", scaleDownStart)
// TODO: revisit result handling

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -55,7 +56,9 @@ type NodeToBeRemoved struct {
func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []*apiv1.Pod,
client client.Interface, predicateChecker *PredicateChecker, maxCount int,
fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError error) {
timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError error) {
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, allNodes)
result := make([]NodeToBeRemoved, 0)
@ -75,9 +78,11 @@ candidateloop:
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
if fastCheck {
podsToRemove, err = FastGetPodsToMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage)
podsToRemove, err = FastGetPodsToMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage,
podDisruptionBudgets)
} else {
podsToRemove, err = DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage, client, int32(*minReplicaCount))
podsToRemove, err = DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage, client, int32(*minReplicaCount),
podDisruptionBudgets)
}
if err != nil {
glog.V(2).Infof("%s: node %s cannot be removed: %v", evaluationType, node.Name, err)
@ -113,7 +118,7 @@ func FindEmptyNodesToRemove(candidates []*apiv1.Node, pods []*apiv1.Pod) []*apiv
for _, node := range candidates {
if nodeInfo, found := nodeNameToNodeInfo[node.Name]; found {
// Should block on all pods.
podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true)
podsToRemove, err := FastGetPodsToMove(nodeInfo, true, true, nil)
if err == nil && len(podsToRemove) == 0 {
result = append(result, node)
}

View File

@ -17,11 +17,15 @@ limitations under the License.
package simulator
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/contrib/cluster-autoscaler/utils/drain"
api "k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@ -30,9 +34,10 @@ import (
// is drained. Raises error if there is an unreplicated pod and force option was not specified.
// Based on kubectl drain code. It makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation). Usefull for fast
// checks. Doesn't check i
func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool) ([]*apiv1.Pod, error) {
return drain.GetPodsForDeletionOnNodeDrain(
// checks.
func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool,
pdbs []*policyv1.PodDisruptionBudget) ([]*apiv1.Pod, error) {
pods, err := drain.GetPodsForDeletionOnNodeDrain(
nodeInfo.Pods(),
api.Codecs.UniversalDecoder(),
false,
@ -42,6 +47,15 @@ func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPod
nil,
0,
time.Now())
if err != nil {
return pods, err
}
if err := checkPdbs(pods, pdbs); err != nil {
return []*apiv1.Pod{}, err
}
return pods, nil
}
// DetailedGetPodsForMove returns a list of pods that should be moved elsewhere if the node
@ -49,8 +63,9 @@ func FastGetPodsToMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPod
// Based on kubectl drain code. It checks whether RC, DS, Jobs and RS that created these pods
// still exist.
func DetailedGetPodsForMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSystemPods bool,
skipNodesWithLocalStorage bool, client client.Interface, minReplicaCount int32) ([]*apiv1.Pod, error) {
return drain.GetPodsForDeletionOnNodeDrain(
skipNodesWithLocalStorage bool, client client.Interface, minReplicaCount int32,
pdbs []*policyv1.PodDisruptionBudget) ([]*apiv1.Pod, error) {
pods, err := drain.GetPodsForDeletionOnNodeDrain(
nodeInfo.Pods(),
api.Codecs.UniversalDecoder(),
false,
@ -60,4 +75,30 @@ func DetailedGetPodsForMove(nodeInfo *schedulercache.NodeInfo, skipNodesWithSyst
client,
minReplicaCount,
time.Now())
if err != nil {
return pods, err
}
if err := checkPdbs(pods, pdbs); err != nil {
return []*apiv1.Pod{}, err
}
return pods, nil
}
func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) error {
// TODO: make it more efficient.
for _, pdb := range pdbs {
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
return err
}
for _, pod := range pods {
if selector.Matches(labels.Set(pod.Labels)) {
if pdb.Status.PodDisruptionsAllowed < 1 {
return fmt.Errorf("no enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name)
}
}
}
}
return nil
}

View File

@ -20,7 +20,9 @@ import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -36,7 +38,7 @@ func TestFastGetPodsToMove(t *testing.T) {
Namespace: "ns",
},
}
_, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod1), true, true)
_, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod1), true, true, nil)
assert.Error(t, err)
// Replicated pod
@ -49,7 +51,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r2, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2), true, true)
r2, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2), true, true, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(r2))
assert.Equal(t, pod2, r2[0])
@ -64,7 +66,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r3, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod3), true, true)
r3, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod3), true, true, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(r3))
@ -78,7 +80,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r4, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2, pod3, pod4), true, true)
r4, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod2, pod3, pod4), true, true, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(r4))
assert.Equal(t, pod2, r4[0])
@ -93,7 +95,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod5), true, true)
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod5), true, true, nil)
assert.Error(t, err)
// Local storage
@ -115,7 +117,7 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod6), true, true)
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod6), true, true, nil)
assert.Error(t, err)
// Non-local storage
@ -139,7 +141,78 @@ func TestFastGetPodsToMove(t *testing.T) {
},
},
}
r7, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod7), true, true)
r7, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod7), true, true, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(r7))
// Pdb blocking
pod8 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod8",
Namespace: "ns",
Annotations: map[string]string{
"kubernetes.io/created-by": "{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicaSet\"}}",
},
Labels: map[string]string{
"critical": "true",
},
},
Spec: apiv1.PodSpec{},
}
pdb8 := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "ns",
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: intstr.FromInt(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"critical": "true",
},
},
},
Status: policyv1.PodDisruptionBudgetStatus{
PodDisruptionsAllowed: 0,
},
}
_, err = FastGetPodsToMove(schedulercache.NewNodeInfo(pod8), true, true, []*policyv1.PodDisruptionBudget{pdb8})
assert.Error(t, err)
// Pdb allowing
pod9 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod9",
Namespace: "ns",
Annotations: map[string]string{
"kubernetes.io/created-by": "{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicaSet\"}}",
},
Labels: map[string]string{
"critical": "true",
},
},
Spec: apiv1.PodSpec{},
}
pdb9 := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: "ns",
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: intstr.FromInt(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"critical": "true",
},
},
},
Status: policyv1.PodDisruptionBudgetStatus{
PodDisruptionsAllowed: 1,
},
}
r9, err := FastGetPodsToMove(schedulercache.NewNodeInfo(pod9), true, true, []*policyv1.PodDisruptionBudget{pdb9})
assert.NoError(t, err)
assert.Equal(t, 1, len(r9))
}

View File

@ -23,8 +23,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
v1lister "k8s.io/kubernetes/pkg/client/listers/core/v1"
v1policylister "k8s.io/kubernetes/pkg/client/listers/policy/v1beta1"
)
// ListerRegistry is a registry providing various listers to list pods or nodes matching conditions
@ -33,22 +35,26 @@ type ListerRegistry interface {
ReadyNodeLister() *ReadyNodeLister
ScheduledPodLister() *ScheduledPodLister
UnschedulablePodLister() *UnschedulablePodLister
PodDisruptionBudgetLister() *PodDisruptionBudgetLister
}
type listerRegistryImpl struct {
allNodeLister *AllNodeLister
readyNodeLister *ReadyNodeLister
scheduledPodLister *ScheduledPodLister
unschedulablePodLister *UnschedulablePodLister
allNodeLister *AllNodeLister
readyNodeLister *ReadyNodeLister
scheduledPodLister *ScheduledPodLister
unschedulablePodLister *UnschedulablePodLister
podDisruptionBudgetLister *PodDisruptionBudgetLister
}
// NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions
func NewListerRegistry(allNode *AllNodeLister, readyNode *ReadyNodeLister, scheduledPod *ScheduledPodLister, unschedulablePod *UnschedulablePodLister) ListerRegistry {
func NewListerRegistry(allNode *AllNodeLister, readyNode *ReadyNodeLister, scheduledPod *ScheduledPodLister,
unschedulablePod *UnschedulablePodLister, podDisruptionBudgetLister *PodDisruptionBudgetLister) ListerRegistry {
return listerRegistryImpl{
allNodeLister: allNode,
readyNodeLister: readyNode,
scheduledPodLister: scheduledPod,
unschedulablePodLister: unschedulablePod,
allNodeLister: allNode,
readyNodeLister: readyNode,
scheduledPodLister: scheduledPod,
unschedulablePodLister: unschedulablePod,
podDisruptionBudgetLister: podDisruptionBudgetLister,
}
}
@ -58,7 +64,9 @@ func NewListerRegistryWithDefaultListers(kubeClient client.Interface, stopChanne
scheduledPodLister := NewScheduledPodLister(kubeClient, stopChannel)
readyNodeLister := NewReadyNodeLister(kubeClient, stopChannel)
allNodeLister := NewAllNodeLister(kubeClient, stopChannel)
return NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodLister, unschedulablePodLister)
podDisruptionBudgetLister := NewPodDisruptionBudgetLister(kubeClient, stopChannel)
return NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodLister,
unschedulablePodLister, podDisruptionBudgetLister)
}
// AllNodeLister returns the AllNodeLister registered to this registry
@ -81,6 +89,11 @@ func (r listerRegistryImpl) UnschedulablePodLister() *UnschedulablePodLister {
return r.unschedulablePodLister
}
// PodDisruptionBudgetLister returns the podDisruptionBudgetLister registered to this registry
func (r listerRegistryImpl) PodDisruptionBudgetLister() *PodDisruptionBudgetLister {
return r.podDisruptionBudgetLister
}
// UnschedulablePodLister lists unscheduled pods
type UnschedulablePodLister struct {
podLister v1lister.PodLister
@ -210,3 +223,25 @@ func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{})
nodeLister: nodeLister,
}
}
// PodDisruptionBudgetLister lists all pod disruption budgets
type PodDisruptionBudgetLister struct {
pdbLister v1policylister.PodDisruptionBudgetLister
}
// List returns all nodes
func (lister *PodDisruptionBudgetLister) List() ([]*policyv1.PodDisruptionBudget, error) {
return lister.pdbLister.List(labels.Everything())
}
// NewPodDisruptionBudgetLister builds a pod disruption budget lister.
func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-chan struct{}) *PodDisruptionBudgetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.Policy().RESTClient(), "podpisruptionbudgets", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pdbLister := v1policylister.NewPodDisruptionBudgetLister(store)
reflector := cache.NewReflector(listWatcher, &policyv1.PodDisruptionBudget{}, store, time.Hour)
reflector.RunUntil(stopchannel)
return &PodDisruptionBudgetLister{
pdbLister: pdbLister,
}
}