Added events for cluster autoscaler.

Added events for cluster autoscaler.
This commit is contained in:
Jerzy Szczepkowski 2016-05-20 14:16:39 +02:00
parent f7f070295b
commit 74da5650af
4 changed files with 35 additions and 12 deletions

View File

@ -24,6 +24,8 @@ import (
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
"github.com/golang/glog"
@ -79,6 +81,11 @@ func main() {
lastScaleDownFailedTrial := time.Now()
underutilizedNodes := make(map[string]time.Time)
eventBroadcaster := kube_record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(kube_api.EventSource{Component: "cluster-autoscaler"})
for {
select {
case <-time.After(time.Minute):
@ -113,7 +120,7 @@ func main() {
if len(unschedulablePodsToHelp) == 0 {
glog.V(1).Info("No unschedulable pods")
} else {
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, migConfigs, gceManager, kubeClient, predicateChecker)
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, migConfigs, gceManager, kubeClient, predicateChecker, recorder)
if err != nil {
glog.Errorf("Failed to scale up: %v", err)
continue

View File

@ -29,16 +29,17 @@ import (
// It will never overestimate the number of nodes but is quite likekly to provide a number that
// is too small.
type BasicNodeEstimator struct {
count int
cpuSum resource.Quantity
memorySum resource.Quantity
portSum map[int32]int
cpuSum resource.Quantity
memorySum resource.Quantity
portSum map[int32]int
FittingPods map[*kube_api.Pod]struct{}
}
// NewBasicNodeEstimator builds BasicNodeEstimator.
func NewBasicNodeEstimator() *BasicNodeEstimator {
return &BasicNodeEstimator{
portSum: make(map[int32]int),
portSum: make(map[int32]int),
FittingPods: make(map[*kube_api.Pod]struct{}),
}
}
@ -69,7 +70,7 @@ func (basicEstimator *BasicNodeEstimator) Add(pod *kube_api.Pod) error {
basicEstimator.portSum[port] = 1
}
}
basicEstimator.count++
basicEstimator.FittingPods[pod] = struct{}{}
return nil
}
@ -108,7 +109,7 @@ func (basicEstimator *BasicNodeEstimator) Estimate(node *kube_api.Node) (int, st
result = maxInt(result, prop)
}
if podCapcaity, ok := node.Status.Capacity[kube_api.ResourcePods]; ok {
prop := int(math.Ceil(float64(basicEstimator.count) / float64(podCapcaity.Value())))
prop := int(math.Ceil(float64(basicEstimator.GetCount()) / float64(podCapcaity.Value())))
buffer.WriteString(fmt.Sprintf("Pods: %d\n", prop))
result = maxInt(result, prop)
}
@ -121,5 +122,5 @@ func (basicEstimator *BasicNodeEstimator) Estimate(node *kube_api.Node) (int, st
// GetCount returns number of pods included in the estimation.
func (basicEstimator *BasicNodeEstimator) GetCount() int {
return basicEstimator.count
return len(basicEstimator.FittingPods)
}

View File

@ -47,12 +47,13 @@ func TestEstimate(t *testing.T) {
estimator := NewBasicNodeEstimator()
for i := 0; i < 5; i++ {
estimator.Add(pod)
podCopy := *pod
estimator.Add(&podCopy)
}
assert.Equal(t, int64(500*5), estimator.cpuSum.MilliValue())
assert.Equal(t, int64(5*memoryPerPod), estimator.memorySum.Value())
assert.Equal(t, 5, estimator.count)
assert.Equal(t, 5, estimator.GetCount())
node := &kube_api.Node{
Status: kube_api.NodeStatus{

View File

@ -24,6 +24,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
"github.com/golang/glog"
@ -39,7 +40,7 @@ type ExpansionOption struct {
// false if it didn't and error if an error occured.
func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfigs []*config.MigConfig,
gceManager *gce.GceManager, kubeClient *kube_client.Client,
predicateChecker *simulator.PredicateChecker) (bool, error) {
predicateChecker *simulator.PredicateChecker, recorder kube_record.EventRecorder) (bool, error) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
@ -58,6 +59,7 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi
return false, fmt.Errorf("failed to build node infors for migs: %v", err)
}
podsRemainUnshedulable := make(map[*kube_api.Pod]struct{})
for _, migConfig := range migConfigs {
currentSize, err := gceManager.GetMigSize(migConfig)
@ -90,6 +92,7 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi
option.estimator.Add(pod)
} else {
glog.V(2).Infof("Scale-up predicate failed: %v", err)
podsRemainUnshedulable[pod] = struct{}{}
}
}
if migHelpsSomePods {
@ -125,7 +128,18 @@ func ScaleUp(unschedulablePods []*kube_api.Pod, nodes []*kube_api.Node, migConfi
if err := gceManager.SetMigSize(bestOption.migConfig, newSize); err != nil {
return false, fmt.Errorf("failed to set MIG size: %v", err)
}
for pod := range bestOption.estimator.FittingPods {
recorder.Eventf(pod, kube_api.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up, mig: %s, sizes (current/new): %d/%d", bestOption.migConfig.Name, currentSize, newSize)
}
return true, nil
}
for pod := range podsRemainUnshedulable {
recorder.Event(pod, kube_api.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
}
return false, nil
}